From 27dbe6ca8dd5158f30b5858600dfe0eee414f1e4 Mon Sep 17 00:00:00 2001 From: Marc Aymerich Date: Sun, 3 May 2015 21:26:17 +0000 Subject: [PATCH] Added tasks app --- TODO.md | 18 +++++++++++++- orchestra/contrib/resources/models.py | 1 + orchestra/contrib/resources/tasks.py | 12 --------- orchestra/contrib/settings/admin.py | 1 + orchestra/contrib/tasks/__init__.py | 35 ++++++++++++++------------- 5 files changed, 37 insertions(+), 30 deletions(-) diff --git a/TODO.md b/TODO.md index 4e2e0961..f48ce942 100644 --- a/TODO.md +++ b/TODO.md @@ -361,4 +361,20 @@ Collecting lxml==3.3.5 (from -r re (line 22)) # project settings modified copy of django's default project settings -# migrate accounts break on superuser insert because of orders signals +# migrate accounts break on superuser insert because of orders signals: read() + db_ready() + +# if backend.async: don't join + +# ngnix setup certificate + from orchestra.contrib.tasks import task + import time, sys + @task(name='rata') + def counter(num, log): + for i in range(1, num): + with open(log, 'a') as handler: + handler.write(str(i)) + sys.stderr.write('hola\n') + time.sleep(1) + counter.apply_async(10, '/tmp/kakas') + +# standard django deployment pracices (run checks) diff --git a/orchestra/contrib/resources/models.py b/orchestra/contrib/resources/models.py index cfcd9a56..df9ceb29 100644 --- a/orchestra/contrib/resources/models.py +++ b/orchestra/contrib/resources/models.py @@ -148,6 +148,7 @@ class Resource(models.Model): def monitor(self, async=True): if async: + print(tasks.monitor.delay) return tasks.monitor.delay(self.pk, async=async) return tasks.monitor(self.pk, async=async) diff --git a/orchestra/contrib/resources/tasks.py b/orchestra/contrib/resources/tasks.py index 34ecd3ec..7286cbb8 100644 --- a/orchestra/contrib/resources/tasks.py +++ b/orchestra/contrib/resources/tasks.py @@ -50,15 +50,3 @@ def monitor(resource_id, ids=None, async=True): triggers.append(op) Operation.execute(triggers) return logs - - -from orchestra.contrib.tasks import task -import time, sys -@task(name='rata') -def counter(num, log): - for i in range(1, num): - with open(log, 'a') as handler: - handler.write(str(i)) -# sys.stderr.write('hola\n') - time.sleep(1) -#counter.apply_async(10, '/tmp/kakas') diff --git a/orchestra/contrib/settings/admin.py b/orchestra/contrib/settings/admin.py index cadff94c..3d483690 100644 --- a/orchestra/contrib/settings/admin.py +++ b/orchestra/contrib/settings/admin.py @@ -103,6 +103,7 @@ class SettingFileView(generic.TemplateView): return context + admin.site.register_url(r'^settings/setting/view/$', SettingFileView.as_view(), 'settings_setting_view') admin.site.register_url(r'^settings/setting/$', SettingView.as_view(), 'settings_setting_change') OrchestraIndexDashboard.register_link('Administration', 'settings_setting_change', _("Settings")) diff --git a/orchestra/contrib/tasks/__init__.py b/orchestra/contrib/tasks/__init__.py index af70fc11..ce00dae0 100644 --- a/orchestra/contrib/tasks/__init__.py +++ b/orchestra/contrib/tasks/__init__.py @@ -52,7 +52,7 @@ def apply_async(fn, name=None, method='thread'): def inner(fn, name, method, *args, **kwargs): task_id = get_id() args = (task_id, name) + args - thread = Process(target=fn, args=args, kwargs=kwargs) + thread = method(target=fn, args=args, kwargs=kwargs) thread.start() # Celery API compat thread.request = AttrDict(id=task_id) @@ -66,28 +66,25 @@ def apply_async(fn, name=None, method='thread'): else: raise NotImplementedError("Support for %s concurrency method is not supported." % method) fn.apply_async = partial(inner, close_connection(keep_state(fn)), name, method) + fn.delay = fn.apply_async return fn -def apply_async_override(fn, name): - if fn is None: - def decorator(fn): - return update_wrapper(apply_async(fn), fn) - return decorator - return update_wrapper(apply_async(fn, name), fn) - - def task(fn=None, **kwargs): # TODO override this if 'celerybeat' in sys.argv ? from . import settings # register task if fn is None: - fn = celery_shared_task(**kwargs) - else: - fn = celery_shared_task(fn) + if settings.TASKS_BACKEND in ('thread', 'process'): + def decorator(fn): + return apply_async(celery_shared_task(fn)) + return decorator + else: + return celery_shared_task(**kwargs) + fn = update_wraper(partial(celery_shared_task, fn)) if settings.TASKS_BACKEND in ('thread', 'process'): name = kwargs.pop('name', None) - apply_async_override(fn, name) + fn = update_wrapper(apply_async(fn, name), fn) return fn @@ -95,10 +92,14 @@ def periodic_task(fn=None, **kwargs): from . import settings # register task if fn is None: - fn = celery_periodic_task(**kwargs) - else: - fn = celery_periodic_task(fn) + if settings.TASKS_BACKEND in ('thread', 'process'): + def decorator(fn): + return apply_async(celery_periodic_task(fn)) + return decorator + else: + return celery_periodic_task(**kwargs) + fn = update_wraper(partial(celery_periodic_task, fn)) if settings.TASKS_BACKEND in ('thread', 'process'): name = kwargs.pop('name', None) - apply_async_override(fn, name) + fn = update_wrapper(apply_async(fn, name), fn) return fn