From 184436dbe4e602e5318fb6ecf92ac1afdadd926b Mon Sep 17 00:00:00 2001 From: Marc Aymerich Date: Sun, 3 May 2015 17:44:46 +0000 Subject: [PATCH] Added tasks app --- TODO.md | 53 ++++++-- orchestra/__init__.py | 2 + orchestra/apps.py | 6 + orchestra/bin/orchestra-admin | 8 ++ orchestra/contrib/domains/settings.py | 6 +- orchestra/contrib/orchestration/tasks.py | 3 +- orchestra/contrib/resources/models.py | 3 +- orchestra/contrib/resources/tasks.py | 5 +- orchestra/contrib/settings/admin.py | 11 ++ orchestra/contrib/tasks/__init__.py | 103 +++++++++++++++ orchestra/contrib/tasks/admin.py | 9 ++ orchestra/contrib/tasks/beat.py | 43 +++++++ orchestra/contrib/tasks/bin/orchestra-beat | 83 ++++++++++++ .../contrib/tasks/management/commands/beat.py | 10 ++ .../tasks/management/commands/runfunction.py | 32 +++++ .../tasks/management/commands/runtask.py | 49 ++++++++ .../management/commands/syncperiodictasks.py | 2 + orchestra/contrib/tasks/parser.py | 61 +++++++++ orchestra/contrib/tasks/schedules.py | 119 ++++++++++++++++++ orchestra/contrib/tasks/settings.py | 11 ++ orchestra/contrib/tasks/utils.py | 12 ++ orchestra/forms/widgets.py | 2 +- orchestra/models/utils.py | 6 +- orchestra/settings.py | 24 ++-- orchestra/utils/db.py | 49 ++++++++ orchestra/utils/sys.py | 22 ++-- requirements.txt | 2 +- setup.py | 5 +- 28 files changed, 695 insertions(+), 46 deletions(-) create mode 100644 orchestra/apps.py create mode 100644 orchestra/contrib/tasks/__init__.py create mode 100644 orchestra/contrib/tasks/admin.py create mode 100644 orchestra/contrib/tasks/beat.py create mode 100755 orchestra/contrib/tasks/bin/orchestra-beat create mode 100644 orchestra/contrib/tasks/management/commands/beat.py create mode 100644 orchestra/contrib/tasks/management/commands/runfunction.py create mode 100644 orchestra/contrib/tasks/management/commands/runtask.py create mode 100644 orchestra/contrib/tasks/management/commands/syncperiodictasks.py create mode 100644 orchestra/contrib/tasks/parser.py create mode 100644 orchestra/contrib/tasks/schedules.py create mode 100644 orchestra/contrib/tasks/settings.py create mode 100644 orchestra/contrib/tasks/utils.py create mode 100644 orchestra/utils/db.py diff --git a/TODO.md b/TODO.md index 06435054..422986bd 100644 --- a/TODO.md +++ b/TODO.md @@ -297,13 +297,8 @@ https://code.djangoproject.com/ticket/24576 # admin edit relevant djanog settings # django SITE_NAME vs ORCHESTRA_SITE_NAME ? -# accounts.migrations link to last auth migration instead of first - - -# DNS allow transfer other NS servers instead of masters and slaves! Replace celery by a custom solution? - # TODO create periodic task like settings, but parsing cronfiles! # TODO create decorator wrapper that abstract the task away from the backen (cron/celery) # TODO crontab model localhost/autoadded attribute * No more jumbo dependencies and wierd bugs @@ -316,15 +311,49 @@ Replace celery by a custom solution? *priority: custom Thread backend *bulk: wrapper arround django-mailer to avoid loading django system +python3 -mvenv env-django-orchestra +source env-django-orchestra/bin/activate +pip3 install django-orchestra==dev --allow-external django-orchestra --allow-unverified django-orchestra +pip3 install -r https://raw.githubusercontent.com/glic3rinu/django-orchestra/master/requirements.txt +# TODO make them optional +sudo apt-get install python3.4-dev libxml2-dev libxslt1-dev libcrack2-dev +wget -O - https://raw.githubusercontent.com/glic3rinu/django-orchestra/master/requirements.txt | xargs pip3 install +django-admin.py startproject panel --template="$HOME/django-orchestra/orchestra/conf/project_template" +python3 panel/manage.py migrate accounts +python3 panel/manage.py migrate +python3 panel/manage.py runserver + + + + + +Collecting lxml==3.3.5 (from -r re (line 22)) + Downloading lxml-3.3.5.tar.gz (3.5MB) + 100% |################################| 3.5MB 60kB/s + Building lxml version 3.3.5. + Building without Cython. + ERROR: b'/bin/sh: 1: xslt-config: not found\n' + ** make sure the development packages of libxml2 and libxslt are installed ** + Using build configuration of libxslt + /usr/lib/python3.4/distutils/dist.py:260: UserWarning: Unknown distribution option: 'bugtrack_url' + warnings.warn(msg) + + +# Setupcron # uwsgi enable threads -# Create superuser on migrate # register signals in app ready() -def ready(self): - if self.has_attr('ready_run'): return - self.ready_run = True - # database_ready(): connect to the database or inspect django connection -# beat.sh -# do settings validation on orchestra.apps.ready(), not during startime +# move Setting to contrib app __init__ +# cracklib vs crack +# remove system dependencies +# deprecate install_dependnecies in favour of only requirements.txt +# import module and sed +# if setting.value == default. remove +# cron backend: os.cron or uwsgi.cron +# reload generic admin view ?redirect=http... +# inspecting django db connection for asserting db readines? +# wake up django mailer on send_mail + +# project settings modified copy of django's default project settings diff --git a/orchestra/__init__.py b/orchestra/__init__.py index 892726ce..ea43b530 100644 --- a/orchestra/__init__.py +++ b/orchestra/__init__.py @@ -1,3 +1,5 @@ +default_app_config = 'orchestra.apps.OrchestraConfig' + VERSION = (0, 0, 1, 'alpha', 1) diff --git a/orchestra/apps.py b/orchestra/apps.py new file mode 100644 index 00000000..dcf13f6f --- /dev/null +++ b/orchestra/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class OrchestraConfig(AppConfig): + name = 'orchestra' + verbose_name = 'Orchestra' diff --git a/orchestra/bin/orchestra-admin b/orchestra/bin/orchestra-admin index 70b9d60d..6c0ebfc2 100755 --- a/orchestra/bin/orchestra-admin +++ b/orchestra/bin/orchestra-admin @@ -122,6 +122,11 @@ function install_requirements () { check_root || true ORCHESTRA_PATH=$(get_orchestra_dir) || true + # TODO reduce this list to 0 + # include /usr/sbin/named-checkzone + # wkhtmltopdf -> reportlab + # remove rabbit, postgres + # uwsgi –py-autoreload for devel APT="python3 \ python3-pip \ python3-psycopg2 \ @@ -137,6 +142,7 @@ function install_requirements () { ca-certificates \ gettext" + # TODO remove celery deps, django 1.8.1, glic3rinu fork, celery email PIP="django==1.8 \ django-celery-email==1.0.4 \ https://github.com/glic3rinu/django-fluent-dashboard/archive/master.zip \ @@ -202,12 +208,14 @@ function install_requirements () { run pip3 install $PIP + # TODO remove # Some versions of rabbitmq-server will not start automatically by default unless ... sed -i "s/# Default-Start:.*/# Default-Start: 2 3 4 5/" /etc/init.d/rabbitmq-server sed -i "s/# Default-Stop:.*/# Default-Stop: 0 1 6/" /etc/init.d/rabbitmq-server run update-rc.d rabbitmq-server defaults # Patch passlib + # TODO discover locaion by importing it IMPORT="from django.contrib.auth.hashers import mask_hash, _" COLLECTIONS="from collections import OrderedDict" ls /usr/local/lib/python*/dist-packages/passlib/ext/django/utils.py \ diff --git a/orchestra/contrib/domains/settings.py b/orchestra/contrib/domains/settings.py index 8e415570..62afd2fa 100644 --- a/orchestra/contrib/domains/settings.py +++ b/orchestra/contrib/domains/settings.py @@ -89,7 +89,7 @@ DOMAINS_DEFAULT_MX = Setting('DOMAINS_DEFAULT_MX', '10 mail.{}.'.format(ORCHESTRA_BASE_DOMAIN), '10 mail2.{}.'.format(ORCHESTRA_BASE_DOMAIN), ), - validators=[lambda mxs: map(validate_mx_record, mxs)], + validators=[lambda mxs: list(map(validate_mx_record, mxs))], help_text="Uses ORCHESTRA_BASE_DOMAIN by default." ) @@ -99,7 +99,7 @@ DOMAINS_DEFAULT_NS = Setting('DOMAINS_DEFAULT_NS', 'ns1.{}.'.format(ORCHESTRA_BASE_DOMAIN), 'ns2.{}.'.format(ORCHESTRA_BASE_DOMAIN), ), - validators=[lambda nss: map(validate_domain_name, nss)], + validators=[lambda nss: list(map(validate_domain_name, nss))], help_text="Uses ORCHESTRA_BASE_DOMAIN by default." ) @@ -118,6 +118,6 @@ DOMAINS_FORBIDDEN = Setting('DOMAINS_FORBIDDEN', DOMAINS_MASTERS = Setting('DOMAINS_MASTERS', (), - validators=[lambda masters: map(validate_ip_address, masters)], + validators=[lambda masters: list(map(validate_ip_address, masters))], help_text="Additional master server ip addresses other than autodiscovered by router.get_servers()." ) diff --git a/orchestra/contrib/orchestration/tasks.py b/orchestra/contrib/orchestration/tasks.py index 6483182f..3884d33e 100644 --- a/orchestra/contrib/orchestration/tasks.py +++ b/orchestra/contrib/orchestration/tasks.py @@ -1,9 +1,10 @@ from datetime import timedelta from celery.task.schedules import crontab -from celery.decorators import periodic_task from django.utils import timezone +from orchestra.contrib.tasks import periodic_task + from .models import BackendLog diff --git a/orchestra/contrib/resources/models.py b/orchestra/contrib/resources/models.py index 2454b5de..cfcd9a56 100644 --- a/orchestra/contrib/resources/models.py +++ b/orchestra/contrib/resources/models.py @@ -5,7 +5,6 @@ from django.db import models from django.utils import timezone from django.utils.functional import cached_property from django.utils.translation import ugettext_lazy as _ -from djcelery.models import CrontabSchedule from orchestra.core import validators from orchestra.models import queryset, fields @@ -64,7 +63,7 @@ class Resource(models.Model): "be prorcessed to match with unit. e.g. 10**9")) disable_trigger = models.BooleanField(_("disable trigger"), default=False, help_text=_("Disables monitors exeeded and recovery triggers")) - crontab = models.ForeignKey(CrontabSchedule, verbose_name=_("crontab"), + crontab = models.ForeignKey('djcelery.CrontabSchedule', verbose_name=_("crontab"), null=True, blank=True, help_text=_("Crontab for periodic execution. " "Leave it empty to disable periodic monitoring")) diff --git a/orchestra/contrib/resources/tasks.py b/orchestra/contrib/resources/tasks.py index c3593baf..7286cbb8 100644 --- a/orchestra/contrib/resources/tasks.py +++ b/orchestra/contrib/resources/tasks.py @@ -1,12 +1,11 @@ -from celery import shared_task - from orchestra.contrib.orchestration import Operation +from orchestra.contrib.tasks import task from orchestra.models.utils import get_model_field_path from .backends import ServiceMonitor -@shared_task(name='resources.Monitor') +@task(name='resources.Monitor') def monitor(resource_id, ids=None, async=True): from .models import ResourceData, Resource diff --git a/orchestra/contrib/settings/admin.py b/orchestra/contrib/settings/admin.py index cadff94c..f69131cc 100644 --- a/orchestra/contrib/settings/admin.py +++ b/orchestra/contrib/settings/admin.py @@ -86,6 +86,17 @@ class SettingView(generic.edit.FormView): messages.success(self.request, _("No changes have been detected.")) return super(SettingView, self).form_valid(form) +from orchestra.contrib.tasks import task +import time, sys +@task(name='rata') +def counter(num, log): + for i in range(1, num): + with open(log, 'a') as handler: + handler.write(str(i)) +# sys.stderr.write('hola\n') + time.sleep(1) +#counter.apply_async(10, '/tmp/kakas') + class SettingFileView(generic.TemplateView): template_name = 'admin/settings/view.html' diff --git a/orchestra/contrib/tasks/__init__.py b/orchestra/contrib/tasks/__init__.py new file mode 100644 index 00000000..dd2c46b0 --- /dev/null +++ b/orchestra/contrib/tasks/__init__.py @@ -0,0 +1,103 @@ +import traceback +from functools import partial, wraps, update_wrapper +from multiprocessing import Process +from uuid import uuid4 +from threading import Thread + +from celery import shared_task as celery_shared_task +from celery import states +from celery.decorators import periodic_task as celery_periodic_task +from django.utils import timezone + +from orchestra.utils.db import close_connection +from orchestra.utils.python import AttrDict, OrderedSet + + +def get_id(): + return str(uuid4()) + + +def get_name(fn): + return '.'.join((fn.__module__, fn.__name__)) + + +def keep_state(fn): + """ logs task on djcelery's TaskState model """ + @wraps(fn) + def wrapper(task_id, name, *args, **kwargs): + from djcelery.models import TaskState + now = timezone.now() + state = TaskState.objects.create(state=states.STARTED, task_id=task_id, name=name, args=str(args), + kwargs=str(kwargs), tstamp=now) + try: + result = fn(*args, **kwargs) + except Exception as exc: + state.state = states.FAILURE + state.traceback = traceback.format_exc() + state.runtime = (timezone.now()-now).total_seconds() + state.save() + return + # TODO send email + else: + state.state = states.SUCCESS + state.result = str(result) + state.runtime = (timezone.now()-now).total_seconds() + state.save() + return result + return wrapper + + +def apply_async(fn, name=None, method='thread'): + """ replaces celery apply_async """ + def inner(fn, name, method, *args, **kwargs): + task_id = get_id() + args = (task_id, name) + args + thread = Process(target=fn, args=args, kwargs=kwargs) + thread.start() + # Celery API compat + thread.request = AttrDict(id=task_id) + return thread + if name is None: + name = get_name(fn) + if method == 'thread': + method = Thread + elif method == 'process': + method = Process + else: + raise NotImplementedError("Support for %s concurrency method is not supported." % method) + fn.apply_async = partial(inner, close_connection(keep_state(fn)), name, method) + return fn + + +def apply_async_override(fn, name): + if fn is None: + def decorator(fn): + return update_wrapper(apply_async(fn), fn) + return decorator + return update_wrapper(apply_async(fn, name), fn) + + +def task(fn=None, **kwargs): + from . import settings + # register task + if fn is None: + fn = celery_shared_task(**kwargs) + else: + fn = celery_shared_task(fn) + if settings.TASKS_BACKEND in ('thread', 'process'): + name = kwargs.pop('name', None) + apply_async_override(fn, name) + return fn + + +def periodic_task(fn=None, **kwargs): + from . import settings + # register task + if fn is None: + fn = celery_periodic_task(**kwargs) + else: + fn = celery_periodic_task(fn) + if settings.TASKS_BACKEND in ('thread', 'process'): + name = kwargs.pop('name', None) + apply_async_override(fn, name) + return fn diff --git a/orchestra/contrib/tasks/admin.py b/orchestra/contrib/tasks/admin.py new file mode 100644 index 00000000..4c17b40a --- /dev/null +++ b/orchestra/contrib/tasks/admin.py @@ -0,0 +1,9 @@ +from django.utils.translation import ugettext_lazy as _ +from djcelery.admin import PeriodicTaskAdmin + +from orchestra.admin.utils import admin_date + + +display_last_run_at = admin_date('last_run_at', short_description=_("Last run")) + +PeriodicTaskAdmin.list_display = ('__unicode__', display_last_run_at, 'total_run_count', 'enabled') diff --git a/orchestra/contrib/tasks/beat.py b/orchestra/contrib/tasks/beat.py new file mode 100644 index 00000000..53ed2369 --- /dev/null +++ b/orchestra/contrib/tasks/beat.py @@ -0,0 +1,43 @@ +import json + +from celery import current_app +from celery.schedules import crontab_parser as CrontabParser +from django.utils import timezone +from djcelery.models import PeriodicTask + +from . import apply_async + + +def is_due(task, time=None): + if time is None: + time = timezone.now() + crontab = task.crontab + parts = map(int, time.strftime("%M %H %w %d %m").split()) + n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = parts + return bool( + n_minute in CrontabParser(60).parse(crontab.minute) and + n_hour in CrontabParser(24).parse(crontab.hour) and + n_day_of_week in CrontabParser(7).parse(crontab.day_of_week) and + n_day_of_month in CrontabParser(31, 1).parse(crontab.day_of_month) and + n_month_of_year in CrontabParser(12, 1).parse(crontab.month_of_year) + ) + + +def run_task(task, thread=True, process=False, async=False): + args = json.loads(task.args) + kwargs = json.loads(task.kwargs) + task_fn = current_app.tasks.get(task.task) + if async: + method = 'process' if process else 'thread' + return apply_async(task_fn, method=method).apply_async(*args, **kwargs) + return task_fn(*args, **kwargs) + + +def run(): + now = timezone.now() + procs = [] + for task in PeriodicTask.objects.enabled().select_related('crontab'): + if is_due(task, now): + proc = run_task(task, process=True, async=True) + procs.append(proc) + [proc.join() for proc in procs] diff --git a/orchestra/contrib/tasks/bin/orchestra-beat b/orchestra/contrib/tasks/bin/orchestra-beat new file mode 100755 index 00000000..7ef9b5a3 --- /dev/null +++ b/orchestra/contrib/tasks/bin/orchestra-beat @@ -0,0 +1,83 @@ +#!/usr/bin/env python3 + +# High performance alternative to beat management command +# +# USAGE: beat /path/to/project/manage.py + + +import json +import os +import re +import sys +from datetime import datetime + +from orchestra.utils import db +from orchestra.utils.python import import_class +from orchestra.utils.sys import run, join + +from celery.schedules import crontab_parser as CrontabParser + + +def get_settings_file(manage): + with open(manage, 'r') as handler: + regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"') + for line in handler.readlines(): + match = regex.search(line) + if match: + settings_module = match.groups()[0] + settings_file = os.path.join(*settings_module.split('.')) + '.py' + settings_file = os.path.join(os.path.dirname(manage), settings_file) + return settings_file + raise ValueError("settings module not found in %s" % manage) + + +def get_tasks(manage): + settings_file = get_settings_file(manage) + settings = db.get_settings(settings_file) + try: + conn = db.get_connection(settings) + except: + sys.stdout.write("ERROR") + sys.stderr.write("I am unable to connect to the database\n") + sys.exit(1) + script, settings_file = sys.argv[:2] + query = ( + "SELECT c.minute, c.hour, c.day_of_week, c.day_of_month, c.month_of_year, p.id " + "FROM djcelery_periodictask as p, djcelery_crontabschedule as c " + "WHERE p.crontab_id = c.id AND p.enabled = True" + ) + tasks = db.run_query(conn, query) + conn.close() + return tasks + + +def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year): + n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now + return ( + n_minute in CrontabParser(60).parse(minute) and + n_hour in CrontabParser(24).parse(hour) and + n_day_of_week in CrontabParser(7).parse(day_of_week) and + n_day_of_month in CrontabParser(31, 1).parse(day_of_month) and + n_month_of_year in CrontabParser(12, 1).parse(month_of_year) + ) + + +if __name__ == "__main__": + manage = sys.argv[1] + now = datetime.utcnow() + now = tuple(map(int, now.strftime("%M %H %w %d %m").split())) + procs = [] + for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(manage): + if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year): + command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format( + manage=manage, task_id=task_id) + proc = run(command, async=True) + procs.append(proc) + code = 0 + for proc in procs: + result = join(proc) + sys.stdout.write(result.stdout.decode('utf8')) + sys.stderr.write(result.stderr.decode('utf8')) + if result.return_code != 0: + code = result.return_code + sys.exit(code) diff --git a/orchestra/contrib/tasks/management/commands/beat.py b/orchestra/contrib/tasks/management/commands/beat.py new file mode 100644 index 00000000..b54a8a0d --- /dev/null +++ b/orchestra/contrib/tasks/management/commands/beat.py @@ -0,0 +1,10 @@ +from django.core.management.base import BaseCommand, CommandError + +from ... import beat + + +class Command(BaseCommand): + help = 'Runs periodic tasks.' + + def handle(self, *args, **options): + beat.run() diff --git a/orchestra/contrib/tasks/management/commands/runfunction.py b/orchestra/contrib/tasks/management/commands/runfunction.py new file mode 100644 index 00000000..88ec2757 --- /dev/null +++ b/orchestra/contrib/tasks/management/commands/runfunction.py @@ -0,0 +1,32 @@ +from django.core.management.base import BaseCommand, CommandError + +from orchestra.utils.python import import_class + +from ... import keep_state, get_id, get_name + + +class Command(BaseCommand): + help = 'Runs Orchestra method.' + + def add_arguments(self, parser): + parser.add_argument('method', + help='Python path to the method to execute.') + parser.add_argument('args', nargs='*', + help='Additional arguments passed to the method.') + + def handle(self, *args, **options): + method = import_class(options['method']) + kwargs = {} + arguments = [] + for arg in args: + if '=' in args: + name, value = arg.split('=') + if value.isdigit(): + value = int(value) + kwargs[name] = value + else: + if arg.isdigit(): + arg = int(arg) + arguments.append(arg) + args = arguments + keep_state(method)(get_id(), get_name(method), *args, **kwargs) diff --git a/orchestra/contrib/tasks/management/commands/runtask.py b/orchestra/contrib/tasks/management/commands/runtask.py new file mode 100644 index 00000000..f5f6b909 --- /dev/null +++ b/orchestra/contrib/tasks/management/commands/runtask.py @@ -0,0 +1,49 @@ +import json + +from celery import current_app +from django.core.management.base import BaseCommand, CommandError +from django.utils import timezone +from djcelery.models import PeriodicTask + +from ... import keep_state, get_id, get_name + + +class Command(BaseCommand): + help = 'Runs Orchestra method.' + + def add_arguments(self, parser): + parser.add_argument('task', + help='Periodic task ID or task name.') + parser.add_argument('args', nargs='*', + help='Additional arguments passed to the task, when task name is used.') + + def handle(self, *args, **options): + + task = options.get('task') + if task.isdigit(): + # periodic task + ptask = PeriodicTask.objects.get(pk=int(task)) + task = current_app.tasks[ptask.task] + args = json.loads(ptask.args) + kwargs = json.loads(ptask.kwargs) + ptask.last_run_at = timezone.now() + ptask.total_run_count += 1 + ptask.save() + else: + # task name + task = current_app.tasks[task] + kwargs = {} + arguments = [] + for arg in args: + if '=' in args: + name, value = arg.split('=') + if value.isdigit(): + value = int(value) + kwargs[name] = value + else: + if arg.isdigit(): + arg = int(arg) + arguments.append(arg) + args = arguments + # Run task synchronously, but logging TaskState + keep_state(task)(get_id(), get_name(task), *args, **kwargs) diff --git a/orchestra/contrib/tasks/management/commands/syncperiodictasks.py b/orchestra/contrib/tasks/management/commands/syncperiodictasks.py new file mode 100644 index 00000000..d638d99e --- /dev/null +++ b/orchestra/contrib/tasks/management/commands/syncperiodictasks.py @@ -0,0 +1,2 @@ +# create crontab entries for defines periodic tasks + diff --git a/orchestra/contrib/tasks/parser.py b/orchestra/contrib/tasks/parser.py new file mode 100644 index 00000000..23cc2fae --- /dev/null +++ b/orchestra/contrib/tasks/parser.py @@ -0,0 +1,61 @@ +import os + + +# Rename module to handler.py +class CronHandler(object): + def __init__(self, filename): + self.content = None + self.filename = filename + + def read(self): + comments = [] + self.content = [] + with open(self.filename, 'r') as handler: + for line in handler.readlines(): + line = line.strip() + if line.startswith('#'): + comments.append(line) + else: + schedule = line.split()[:5] + command = ' '.join(line.split()[5:]).strip() + self.content.append((schedule, command, comments)) + comments = [] + + def save(self, backup=True): + if self.content is None: + raise Exception("First read() the cron file!") + if backup: + os.rename(self.filename, self.filename + '.backup') + with open(self.filename, 'w') as handler: + handler.write('\n'.join(self.content)) + handler.truncate() + self.reload() + + def reload(self): + pass + # TODO + + def remove(self, command): + if self.content is None: + raise Exception("First read() the cron file!") + new_content = [] + for c_schedule, c_command, c_comments in self.content: + if command != c_command: + new_content.append((c_schedule, c_command, c_comments)) + self.content = new_content + + def add_or_update(self, schedule, command, comments=None): + """ if content contains an equal command, its schedule is updated """ + if self.content is None: + raise Exception("First read() the cron file!") + new_content = [] + replaced = False + for c_schedule, c_command, c_comments in self.content: + if command == c_command: + replaced = True + new_content.append((schedule, command, comments or c_comments)) + else: + new_content.append((c_schedule, c_command, c_comments)) + if not replaced: + new_content.append((schedule, command, comments or [])) + self.content = new_content diff --git a/orchestra/contrib/tasks/schedules.py b/orchestra/contrib/tasks/schedules.py new file mode 100644 index 00000000..18a5d82a --- /dev/null +++ b/orchestra/contrib/tasks/schedules.py @@ -0,0 +1,119 @@ +#import re + + +#class CronTab(object): +# pass + + +#class ParseException(Exception): +# """Raised by crontab_parser when the input can't be parsed.""" + + +## https://github.com/celery/celery/blob/master/celery/schedules.py +#class CrontabParser(object): +# """Parser for crontab expressions. Any expression of the form 'groups' +# (see BNF grammar below) is accepted and expanded to a set of numbers. +# These numbers represent the units of time that the crontab needs to +# run on:: +# digit :: '0'..'9' +# dow :: 'a'..'z' +# number :: digit+ | dow+ +# steps :: number +# range :: number ( '-' number ) ? +# numspec :: '*' | range +# expr :: numspec ( '/' steps ) ? +# groups :: expr ( ',' expr ) * +# The parser is a general purpose one, useful for parsing hours, minutes and +# day_of_week expressions. Example usage:: +# >>> minutes = crontab_parser(60).parse('*/15') +# [0, 15, 30, 45] +# >>> hours = crontab_parser(24).parse('*/4') +# [0, 4, 8, 12, 16, 20] +# >>> day_of_week = crontab_parser(7).parse('*') +# [0, 1, 2, 3, 4, 5, 6] +# It can also parse day_of_month and month_of_year expressions if initialized +# with an minimum of 1. Example usage:: +# >>> days_of_month = crontab_parser(31, 1).parse('*/3') +# [1, 4, 7, 10, 13, 16, 19, 22, 25, 28, 31] +# >>> months_of_year = crontab_parser(12, 1).parse('*/2') +# [1, 3, 5, 7, 9, 11] +# >>> months_of_year = crontab_parser(12, 1).parse('2-12/2') +# [2, 4, 6, 8, 10, 12] +# The maximum possible expanded value returned is found by the formula:: +# max_ + min_ - 1 +# """ +# ParseException = ParseException + +# _range = r'(\w+?)-(\w+)' +# _steps = r'/(\w+)?' +# _star = r'\*' + +# def __init__(self, max_=60, min_=0): +# self.max_ = max_ +# self.min_ = min_ +# self.pats = ( +# (re.compile(self._range + self._steps), self._range_steps), +# (re.compile(self._range), self._expand_range), +# (re.compile(self._star + self._steps), self._star_steps), +# (re.compile('^' + self._star + '$'), self._expand_star), +# ) + +# def parse(self, spec): +# acc = set() +# for part in spec.split(','): +# if not part: +# raise self.ParseException('empty part') +# acc |= set(self._parse_part(part)) +# return acc + +# def _parse_part(self, part): +# for regex, handler in self.pats: +# m = regex.match(part) +# if m: +# return handler(m.groups()) +# return self._expand_range((part, )) + +# def _expand_range(self, toks): +# fr = self._expand_number(toks[0]) +# if len(toks) > 1: +# to = self._expand_number(toks[1]) +# if to < fr: # Wrap around max_ if necessary +# return (list(range(fr, self.min_ + self.max_)) + +# list(range(self.min_, to + 1))) +# return list(range(fr, to + 1)) +# return [fr] + +# def _range_steps(self, toks): +# if len(toks) != 3 or not toks[2]: +# raise self.ParseException('empty filter') +# return self._expand_range(toks[:2])[::int(toks[2])] + +# def _star_steps(self, toks): +# if not toks or not toks[0]: +# raise self.ParseException('empty filter') +# return self._expand_star()[::int(toks[0])] + +# def _expand_star(self, *args): +# return list(range(self.min_, self.max_ + self.min_)) + +# def _expand_number(self, s): +# if isinstance(s, str) and s[0] == '-': +# raise self.ParseException('negative numbers not supported') +# try: +# i = int(s) +# except ValueError: +# try: +# i = weekday(s) +# except KeyError: +# raise ValueError('Invalid weekday literal {0!r}.'.format(s)) + +# max_val = self.min_ + self.max_ - 1 +# if i > max_val: +# raise ValueError( +# 'Invalid end range: {0} > {1}.'.format(i, max_val)) +# if i < self.min_: +# raise ValueError( +# 'Invalid beginning range: {0} < {1}.'.format(i, self.min_)) + +# return i + diff --git a/orchestra/contrib/tasks/settings.py b/orchestra/contrib/tasks/settings.py new file mode 100644 index 00000000..95bbb6d9 --- /dev/null +++ b/orchestra/contrib/tasks/settings.py @@ -0,0 +1,11 @@ +from orchestra.settings import Setting + + +TASKS_BACKEND = Setting('TASKS_BACKEND', + 'thread', + choices=( + ('thread', "threading.Thread (no queue)"), + ('process', "multiprocess.Process (no queue)"), + ('celery', "Celery (with queue)"), + ) +) diff --git a/orchestra/contrib/tasks/utils.py b/orchestra/contrib/tasks/utils.py new file mode 100644 index 00000000..21c74df3 --- /dev/null +++ b/orchestra/contrib/tasks/utils.py @@ -0,0 +1,12 @@ +import threading + +from orchestra.utils.db import close_connection + + +# TODO import as_task + +def run(method, *args, **kwargs): + async = kwargs.pop('async', True) + thread = threading.Thread(target=close_connection(method), args=args, kwargs=kwargs) + thread = Process(target=close_connection(counter)) + thread.start() diff --git a/orchestra/forms/widgets.py b/orchestra/forms/widgets.py index 21db887b..d316717d 100644 --- a/orchestra/forms/widgets.py +++ b/orchestra/forms/widgets.py @@ -29,7 +29,7 @@ class SpanWidget(forms.Widget): return mark_safe('%s' % (icon, str(display))) tag = self.tag[:-1] endtag = '/'.join((self.tag[0], self.tag[1:])) - return mark_safe('%s%s >%s%s' % (tag, forms.util.flatatt(final_attrs), display, endtag)) + return mark_safe('%s%s >%s%s' % (tag, forms.utils.flatatt(final_attrs), display, endtag)) def value_from_datadict(self, data, files, name): return self.original diff --git a/orchestra/models/utils.py b/orchestra/models/utils.py index 58a288d1..79aab558 100644 --- a/orchestra/models/utils.py +++ b/orchestra/models/utils.py @@ -1,18 +1,18 @@ from django.conf import settings -from django.db.models import loading +from django.apps import apps import importlib def get_model(label, import_module=True): app_label, model_name = label.split('.') - model = loading.get_model(app_label, model_name) + model = apps.get_model(app_label, model_name) if model is None: # Sometimes the models module is not yet imported for app in settings.INSTALLED_APPS: if app.endswith(app_label): app_label = app importlib.import_module('%s.%s' % (app_label, 'admin')) - return loading.get_model(*label.split('.')) + return apps.get_model(*label.split('.')) return model diff --git a/orchestra/settings.py b/orchestra/settings.py index 85e1db05..28ea9b84 100644 --- a/orchestra/settings.py +++ b/orchestra/settings.py @@ -3,6 +3,7 @@ import sys from collections import OrderedDict from django.conf import settings +from django.core.checks import register, Error from django.core.exceptions import ValidationError, AppRegistryNotReady from django.core.validators import validate_email from django.db.models import get_model @@ -42,16 +43,6 @@ class Setting(object): for name, value in kwargs.items(): setattr(self, name, value) self.value = self.get_value(self.name, self.default) - try: - self.validate_value(self.value) - except ValidationError as exc: - # Init time warning - sys.stderr.write("Error validating setting %s with value %s\n" % (self.name, self.value)) - sys.stderr.write(format_exception(exc)) - raise exc - except AppRegistryNotReady: - # lazy bastards - pass self.settings[name] = self @classmethod @@ -117,6 +108,19 @@ class Setting(object): return getattr(cls.conf_settings, name, default) +@register() +def check_settings(app_configs, **kwargs): + """ perfroms all the validation """ + messages = [] + for name, setting in Setting.settings.items(): + try: + setting.validate_value(setting.value) + except ValidationError as exc: + msg = "Error validating setting with value %s: %s" % (setting.value, str(exc)) + messages.append(Error(msg, obj=name, id='settings.E001')) + return messages + + ORCHESTRA_BASE_DOMAIN = Setting('ORCHESTRA_BASE_DOMAIN', 'orchestra.lan', help_text=("Base domain name used for other settings.
" diff --git a/orchestra/utils/db.py b/orchestra/utils/db.py new file mode 100644 index 00000000..a90803a3 --- /dev/null +++ b/orchestra/utils/db.py @@ -0,0 +1,49 @@ +import ast + +from django import db + + +def close_connection(execute): + """ Threads have their own connection pool, closing it when finishing """ + def wrapper(*args, **kwargs): + try: + log = execute(*args, **kwargs) + except Exception as e: + pass + else: + wrapper.log = log + finally: + db.connection.close() + return wrapper + + +def get_settings(settings_file): + """ get db settings from settings.py file without importing """ + settings = {} + with open(settings_file, 'r') as handler: + body = ast.parse(handler.read()).body + for var in body: + targets = getattr(var, 'targets', None) + if targets and targets[0].id == 'DATABASES': + keys = var.value.values[0].keys + values = var.value.values[0].values + for key, value in zip(keys, values): + if key.s == 'ENGINE': + if not 'postgresql' in value.s: + raise ValueError("%s engine not supported." % value) + settings[key.s] = getattr(value, 's', None) + return settings + + +def get_connection(settings): + import psycopg2 + conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**settings)) + return conn + + +def run_query(conn, query): + cur = conn.cursor() + cur.execute(query) + result = cur.fetchall() + cur.close() + return result diff --git a/orchestra/utils/sys.py b/orchestra/utils/sys.py index f461bca0..1cbce5c7 100644 --- a/orchestra/utils/sys.py +++ b/orchestra/utils/sys.py @@ -45,7 +45,7 @@ def read_async(fd): return '' -def runiterator(command, display=False, error_codes=[0], silent=False, stdin=b''): +def runiterator(command, display=False, stdin=b''): """ Subprocess wrapper for running commands concurrently """ if display: sys.stderr.write("\n\033[1m $ %s\033[0m\n" % command) @@ -83,6 +83,7 @@ def runiterator(command, display=False, error_codes=[0], silent=False, stdin=b'' state = _Attribute(stdout) state.stderr = stderr state.return_code = p.poll() + state.command = command yield state if state.return_code != None: @@ -90,13 +91,8 @@ def runiterator(command, display=False, error_codes=[0], silent=False, stdin=b'' p.stderr.close() raise StopIteration - -def run(command, display=False, error_codes=[0], silent=False, stdin=b'', async=False): - iterator = runiterator(command, display, error_codes, silent, stdin) - next(iterator) - if async: - return iterator - +def join(iterator, display=False, silent=False, error_codes=[0]): + """ joins the iterator process """ stdout = b'' stderr = b'' for state in iterator: @@ -114,7 +110,7 @@ def run(command, display=False, error_codes=[0], silent=False, stdin=b'', async= if return_code not in error_codes: out.failed = True msg = "\nrun() encountered an error (return code %s) while executing '%s'\n" - msg = msg % (return_code, command) + msg = msg % (return_code, state.command) if display: sys.stderr.write("\n\033[1;31mCommandError: %s %s\033[m\n" % (msg, err)) if not silent: @@ -124,6 +120,14 @@ def run(command, display=False, error_codes=[0], silent=False, stdin=b'', async= return out +def run(command, display=False, error_codes=[0], silent=False, stdin=b'', async=False): + iterator = runiterator(command, display, stdin) + next(iterator) + if async: + return iterator + return join(iterator, display=display, silent=silent, error_codes=error_codes) + + def sshrun(addr, command, *args, **kwargs): command = command.replace("'", """'"'"'""") cmd = "ssh -o stricthostkeychecking=no -C root@%s '%s'" % (addr, command) diff --git a/requirements.txt b/requirements.txt index a9f5a189..df348616 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ cracklib psycopg2 -django==1.8 +django==1.8.1 django-celery-email==1.0.4 https://github.com/glic3rinu/django-fluent-dashboard/archive/master.zip https://bitbucket.org/izi/django-admin-tools/get/a0abfffd76a0.zip diff --git a/setup.py b/setup.py index 10202805..804e8337 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,10 @@ setup( "The goal of this project is to provide the tools for easily build a fully " "featured control panel that fits any service architecture."), include_package_data = True, - scripts=['orchestra/bin/orchestra-admin'], + scripts=[ + 'orchestra/bin/orchestra-admin', + 'orchestra/contrib/tasks/bin/orchestra-beat', + ], packages = packages, classifiers = [ 'Development Status :: 1 - Alpha',