django-orchestra-test/orchestra/contrib/tasks/management/commands/syncperiodictasks.py

16 lines
653 B
Python
Raw Normal View History

from django.core.management.base import BaseCommand, CommandError
from djcelery.app import app
from djcelery.schedulers import DatabaseScheduler
2015-05-03 17:44:46 +00:00
class Command(BaseCommand):
help = 'Runs Orchestra method.'
def handle(self, *args, **options):
dbschedule = DatabaseScheduler(app=app)
self.stdout.write('\033[1m%i periodic tasks have been syncronized:\033[0m' % len(dbschedule.schedule))
size = max([len(name) for name in dbschedule.schedule])+1
for name, task in dbschedule.schedule.items():
spaces = ' '*(size-len(name))
self.stdout.write(' %s%s%s' % (name, spaces, task.schedule))