#!/usr/bin/env python3 # High performance alternative to beat management command # Looks for pending work before firing up all the Django machinery on separate processes # # Handles orchestra.contrib.tasks periodic_tasks and orchestra.contrib.mailer queued mails # # USAGE: beat /path/to/project/manage.py import json import os import re import sys from datetime import datetime, timedelta from orchestra.utils.sys import run, join, LockFile class crontab_parser(object): """ from celery.schedules import crontab_parser Too expensive to import celery """ ParseException = ValueError _range = r'(\w+?)-(\w+)' _steps = r'/(\w+)?' _star = r'\*' def __init__(self, max_=60, min_=0): self.max_ = max_ self.min_ = min_ self.pats = ( (re.compile(self._range + self._steps), self._range_steps), (re.compile(self._range), self._expand_range), (re.compile(self._star + self._steps), self._star_steps), (re.compile('^' + self._star + '$'), self._expand_star), ) def parse(self, spec): acc = set() for part in spec.split(','): if not part: raise self.ParseException('empty part') acc |= set(self._parse_part(part)) return acc def _parse_part(self, part): for regex, handler in self.pats: m = regex.match(part) if m: return handler(m.groups()) return self._expand_range((part, )) def _expand_range(self, toks): fr = self._expand_number(toks[0]) if len(toks) > 1: to = self._expand_number(toks[1]) if to < fr: # Wrap around max_ if necessary return (list(range(fr, self.min_ + self.max_)) + list(range(self.min_, to + 1))) return list(range(fr, to + 1)) return [fr] def _range_steps(self, toks): if len(toks) != 3 or not toks[2]: raise self.ParseException('empty filter') return self._expand_range(toks[:2])[::int(toks[2])] def _star_steps(self, toks): if not toks or not toks[0]: raise self.ParseException('empty filter') return self._expand_star()[::int(toks[0])] def _expand_star(self, *args): return list(range(self.min_, self.max_ + self.min_)) def _expand_number(self, s): if isinstance(s, str) and s[0] == '-': raise self.ParseException('negative numbers not supported') try: i = int(s) except ValueError: try: i = weekday(s) except KeyError: raise ValueError('Invalid weekday literal {0!r}.'.format(s)) max_val = self.min_ + self.max_ - 1 if i > max_val: raise ValueError( 'Invalid end range: {0} > {1}.'.format(i, max_val)) if i < self.min_: raise ValueError( 'Invalid beginning range: {0} < {1}.'.format(i, self.min_)) return i class Setting(object): def __init__(self, manage): self.manage = manage self.settings_file = self.get_settings_file(manage) def get_settings(self): """ get db settings from settings.py file without importing """ settings = {'__file__': self.settings_file} with open(self.settings_file) as f: content = '' for line in f.readlines(): # This is very costly, skip if not line.startswith(('import djcelery', 'djcelery.setup_loader()')): content += line exec(content, settings) return settings def get_settings_file(self, manage): with open(manage, 'r') as handler: regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"') for line in handler.readlines(): match = regex.search(line) if match: settings_module = match.groups()[0] settings_file = os.path.join(*settings_module.split('.')) + '.py' settings_file = os.path.join(os.path.dirname(manage), settings_file) return settings_file raise ValueError("settings module not found in %s" % manage) class DB(object): def __init__(self, settings): self.settings = settings['DATABASES']['default'] def connect(self): if self.settings['ENGINE'] == 'django.db.backends.sqlite3': import sqlite3 self.conn = sqlite3.connect(self.settings['NAME']) elif self.settings['ENGINE'] == 'django.db.backends.postgresql_psycopg2': import psycopg2 self.conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**self.settings)) else: raise ValueError("%s engine not supported." % self.settings['ENGINE']) def query(self, query): cur = self.conn.cursor() try: cur.execute(query) result = cur.fetchall() finally: cur.close() return result def close(self): self.conn.close() def fire_pending_tasks(manage, db): def get_tasks(db): enabled = 1 if 'sqlite' in db.settings['ENGINE'] else True query = ( "SELECT c.minute, c.hour, c.day_of_week, c.day_of_month, c.month_of_year, p.id " "FROM djcelery_periodictask as p, djcelery_crontabschedule as c " "WHERE p.crontab_id = c.id AND p.enabled = {}" ).format(enabled) return db.query(query) def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year): n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now return ( n_minute in crontab_parser(60).parse(minute) and n_hour in crontab_parser(24).parse(hour) and n_day_of_week in crontab_parser(7).parse(day_of_week) and n_day_of_month in crontab_parser(31, 1).parse(day_of_month) and n_month_of_year in crontab_parser(12, 1).parse(month_of_year) ) now = datetime.utcnow() now = tuple(map(int, now.strftime("%M %H %w %d %m").split())) for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(db): if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year): command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format( manage=manage, task_id=task_id) proc = run(command, run_async=True) yield proc def fire_pending_messages(settings, db): def has_pending_messages(settings, db): MAILER_DEFERE_SECONDS = settings.get('MAILER_DEFERE_SECONDS', (300, 600, 60*60, 60*60*24)) now = datetime.utcnow() query_or = [] for num, seconds in enumerate(MAILER_DEFERE_SECONDS): delta = timedelta(seconds=seconds) epoch = now-delta query_or.append("""(mailer_message.retries = %i AND mailer_message.last_try <= '%s')""" % (num, epoch.isoformat().replace('T', ' '))) query = """\ SELECT 1 FROM mailer_message WHERE (mailer_message.state = 'QUEUED' OR (mailer_message.state = 'DEFERRED' AND (%s))) LIMIT 1""" % ' OR '.join(query_or) return bool(db.query(query)) if has_pending_messages(settings, db): command = 'python3 -W ignore::DeprecationWarning {manage} sendpendingmessages'.format(manage=manage) proc = run(command, run_async=True) yield proc if __name__ == "__main__": with LockFile('/dev/shm/beat.lock', expire=20): manage = sys.argv[1] procs = [] settings = Setting(manage).get_settings() db = DB(settings) db.connect() try: # Non-blocking loop, we need to finish this in time for the next minute. if 'orchestra.contrib.tasks' in settings['INSTALLED_APPS']: if settings.get('TASKS_BACKEND', 'thread') in ('thread', 'process'): for proc in fire_pending_tasks(manage, db): procs.append(proc) if 'orchestra.contrib.mailer' in settings['INSTALLED_APPS']: for proc in fire_pending_messages(settings, db): procs.append(proc) finally: db.close() sys.exit(0)