#!/usr/bin/env python3 # High performance alternative to beat management command # Looks for pending work before firing up all the Django machinery on separate processes # # Handles orchestra.contrib.tasks periodic_tasks and orchestra.contrib.mailer queued mails # # USAGE: beat /path/to/project/manage.py import json import os import re import sys from datetime import datetime, timedelta from celery.schedules import crontab_parser as CrontabParser from orchestra.utils.sys import run, join, LockFile class Setting(object): def __init__(self, manage): self.manage = manage self.settings_file = self.get_settings_file(manage) def get_settings(self): """ get db settings from settings.py file without importing """ settings = {'__file__': self.settings_file} with open(self.settings_file) as f: __file__ = 'rata' exec(f.read(), settings) return settings def get_settings_file(self, manage): with open(manage, 'r') as handler: regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"') for line in handler.readlines(): match = regex.search(line) if match: settings_module = match.groups()[0] settings_file = os.path.join(*settings_module.split('.')) + '.py' settings_file = os.path.join(os.path.dirname(manage), settings_file) return settings_file raise ValueError("settings module not found in %s" % manage) class DB(object): def __init__(self, settings): self.settings = settings['DATABASES']['default'] def connect(self): if self.settings['ENGINE'] == 'django.db.backends.sqlite3': import sqlite3 self.conn = sqlite3.connect(self.settings['NAME']) elif self.settings['ENGINE'] == 'django.db.backends.postgresql_psycopg2': import psycopg2 self.conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**self.settings)) else: raise ValueError("%s engine not supported." % self.settings['ENGINE']) def query(self, query): cur = self.conn.cursor() try: cur.execute(query) result = cur.fetchall() finally: cur.close() return result def close(self): self.conn.close() def fire_pending_tasks(manage, db): def get_tasks(db): enabled = 1 if 'sqlite' in db.settings['ENGINE'] else True query = ( "SELECT c.minute, c.hour, c.day_of_week, c.day_of_month, c.month_of_year, p.id " "FROM djcelery_periodictask as p, djcelery_crontabschedule as c " "WHERE p.crontab_id = c.id AND p.enabled = {}" ).format(enabled) return db.query(query) def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year): n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now return ( n_minute in CrontabParser(60).parse(minute) and n_hour in CrontabParser(24).parse(hour) and n_day_of_week in CrontabParser(7).parse(day_of_week) and n_day_of_month in CrontabParser(31, 1).parse(day_of_month) and n_month_of_year in CrontabParser(12, 1).parse(month_of_year) ) now = datetime.utcnow() now = tuple(map(int, now.strftime("%M %H %w %d %m").split())) for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(db): if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year): command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format( manage=manage, task_id=task_id) proc = run(command, async=True) yield proc def fire_pending_messages(settings, db): def has_pending_messages(settings, db): MAILER_DEFERE_SECONDS = settings.get('MAILER_DEFERE_SECONDS', (300, 600, 60*60, 60*60*24)) now = datetime.utcnow() query_or = [] for num, seconds in enumerate(MAILER_DEFERE_SECONDS): delta = timedelta(seconds=seconds) epoch = now-delta query_or.append("""(mailer_message.retries = 0 AND mailer_message.last_retry <= '%s')""" % epoch.isoformat().replace('T', ' ')) query = """\ SELECT 1 FROM mailer_message WHERE (mailer_message.state = 'QUEUED' OR (mailer_message.state = 'DEFERRED' AND (%s))) LIMIT 1""" % ' OR '.join(query_or) return bool(db.query(query)) if has_pending_messages(settings, db): command = 'python3 -W ignore::DeprecationWarning {manage} sendpendingmessages'.format(manage=manage) proc = run(command, async=True) yield proc if __name__ == "__main__": with LockFile('/dev/shm/beat.lock', expire=20): manage = sys.argv[1] procs = [] settings = Setting(manage).get_settings() db = DB(settings) db.connect() try: # Non-blocking loop, we need to finish this in time for the next minute. if 'orchestra.contrib.tasks' in settings['INSTALLED_APPS']: if settings.get('TASKS_BACKEND', 'thread') in ('thread', 'process'): for proc in fire_pending_tasks(manage, db): procs.append(proc) if 'orchestra.contrib.mailer' in settings['INSTALLED_APPS']: for proc in fire_pending_messages(settings, db): procs.append(proc) finally: db.close() sys.exit(0)