Added tasks app

This commit is contained in:
Marc Aymerich 2015-05-03 21:26:17 +00:00
parent b97160b935
commit 27dbe6ca8d
5 changed files with 37 additions and 30 deletions

18
TODO.md
View file

@ -361,4 +361,20 @@ Collecting lxml==3.3.5 (from -r re (line 22))
# project settings modified copy of django's default project settings
# migrate accounts break on superuser insert because of orders signals
# migrate accounts break on superuser insert because of orders signals: read() + db_ready()
# if backend.async: don't join
# ngnix setup certificate
from orchestra.contrib.tasks import task
import time, sys
@task(name='rata')
def counter(num, log):
for i in range(1, num):
with open(log, 'a') as handler:
handler.write(str(i))
sys.stderr.write('hola\n')
time.sleep(1)
counter.apply_async(10, '/tmp/kakas')
# standard django deployment pracices (run checks)

View file

@ -148,6 +148,7 @@ class Resource(models.Model):
def monitor(self, async=True):
if async:
print(tasks.monitor.delay)
return tasks.monitor.delay(self.pk, async=async)
return tasks.monitor(self.pk, async=async)

View file

@ -50,15 +50,3 @@ def monitor(resource_id, ids=None, async=True):
triggers.append(op)
Operation.execute(triggers)
return logs
from orchestra.contrib.tasks import task
import time, sys
@task(name='rata')
def counter(num, log):
for i in range(1, num):
with open(log, 'a') as handler:
handler.write(str(i))
# sys.stderr.write('hola\n')
time.sleep(1)
#counter.apply_async(10, '/tmp/kakas')

View file

@ -103,6 +103,7 @@ class SettingFileView(generic.TemplateView):
return context
admin.site.register_url(r'^settings/setting/view/$', SettingFileView.as_view(), 'settings_setting_view')
admin.site.register_url(r'^settings/setting/$', SettingView.as_view(), 'settings_setting_change')
OrchestraIndexDashboard.register_link('Administration', 'settings_setting_change', _("Settings"))

View file

@ -52,7 +52,7 @@ def apply_async(fn, name=None, method='thread'):
def inner(fn, name, method, *args, **kwargs):
task_id = get_id()
args = (task_id, name) + args
thread = Process(target=fn, args=args, kwargs=kwargs)
thread = method(target=fn, args=args, kwargs=kwargs)
thread.start()
# Celery API compat
thread.request = AttrDict(id=task_id)
@ -66,28 +66,25 @@ def apply_async(fn, name=None, method='thread'):
else:
raise NotImplementedError("Support for %s concurrency method is not supported." % method)
fn.apply_async = partial(inner, close_connection(keep_state(fn)), name, method)
fn.delay = fn.apply_async
return fn
def apply_async_override(fn, name):
if fn is None:
def decorator(fn):
return update_wrapper(apply_async(fn), fn)
return decorator
return update_wrapper(apply_async(fn, name), fn)
def task(fn=None, **kwargs):
# TODO override this if 'celerybeat' in sys.argv ?
from . import settings
# register task
if fn is None:
fn = celery_shared_task(**kwargs)
else:
fn = celery_shared_task(fn)
if settings.TASKS_BACKEND in ('thread', 'process'):
def decorator(fn):
return apply_async(celery_shared_task(fn))
return decorator
else:
return celery_shared_task(**kwargs)
fn = update_wraper(partial(celery_shared_task, fn))
if settings.TASKS_BACKEND in ('thread', 'process'):
name = kwargs.pop('name', None)
apply_async_override(fn, name)
fn = update_wrapper(apply_async(fn, name), fn)
return fn
@ -95,10 +92,14 @@ def periodic_task(fn=None, **kwargs):
from . import settings
# register task
if fn is None:
fn = celery_periodic_task(**kwargs)
else:
fn = celery_periodic_task(fn)
if settings.TASKS_BACKEND in ('thread', 'process'):
def decorator(fn):
return apply_async(celery_periodic_task(fn))
return decorator
else:
return celery_periodic_task(**kwargs)
fn = update_wraper(partial(celery_periodic_task, fn))
if settings.TASKS_BACKEND in ('thread', 'process'):
name = kwargs.pop('name', None)
apply_async_override(fn, name)
fn = update_wrapper(apply_async(fn, name), fn)
return fn