From 31ac035c27b32eb94f35afb6be16af97632a6d96 Mon Sep 17 00:00:00 2001 From: Marc Aymerich Date: Wed, 6 May 2015 10:51:12 +0000 Subject: [PATCH] Non-blocking beat --- TODO.md | 3 + orchestra/bin/orchestra-beat | 42 ++++----- orchestra/contrib/mailer/engine.py | 42 ++++----- orchestra/contrib/resources/tasks.py | 85 ++++++++++--------- orchestra/contrib/webapps/backends/php.py | 32 ++++--- orchestra/contrib/websites/backends/apache.py | 24 ++++-- orchestra/utils/sys.py | 32 +++++++ 7 files changed, 155 insertions(+), 105 deletions(-) diff --git a/TODO.md b/TODO.md index 37cfbe83..88b73097 100644 --- a/TODO.md +++ b/TODO.md @@ -386,3 +386,6 @@ try: import uwsgi to know its running uwsgi # don't block on beat, and --report periodic tasks # Deprecate restart/start/stop services (do touch wsgi.py and fuck celery) + + +# high perf apache sync, diff --git a/orchestra/bin/orchestra-beat b/orchestra/bin/orchestra-beat index 13197f4b..26b8a5d5 100755 --- a/orchestra/bin/orchestra-beat +++ b/orchestra/bin/orchestra-beat @@ -16,7 +16,7 @@ from datetime import datetime, timedelta from celery.schedules import crontab_parser as CrontabParser -from orchestra.utils.sys import run, join +from orchestra.utils.sys import run, join, LockFile class Setting(object): @@ -126,27 +126,21 @@ def fire_pending_messages(settings, db): if __name__ == "__main__": - # TODO aquire lock - manage = sys.argv[1] - procs = [] - settings = Setting(manage).get_settings() - db = DB(settings) - db.connect() - try: - if 'orchestra.contrib.tasks' in settings['INSTALLED_APPS']: - if settings.get('TASKS_BACKEND', 'thread') in ('thread', 'process'): - for proc in fire_pending_tasks(manage, db): + with LockFile('/dev/shm/beat.lock', expire=20): + manage = sys.argv[1] + procs = [] + settings = Setting(manage).get_settings() + db = DB(settings) + db.connect() + try: + # Non-blocking loop, we need to finish this in time for the next minute. + if 'orchestra.contrib.tasks' in settings['INSTALLED_APPS']: + if settings.get('TASKS_BACKEND', 'thread') in ('thread', 'process'): + for proc in fire_pending_tasks(manage, db): + procs.append(proc) + if 'orchestra.contrib.mailer' in settings['INSTALLED_APPS']: + for proc in fire_pending_messages(settings, db): procs.append(proc) - if 'orchestra.contrib.mailer' in settings['INSTALLED_APPS']: - for proc in fire_pending_messages(settings, db): - procs.append(proc) - exit_code = 0 - for proc in procs: - result = join(proc) - sys.stdout.write(result.stdout.decode('utf8')) - sys.stderr.write(result.stderr.decode('utf8')) - if result.return_code != 0: - exit_code = result.return_code - finally: - db.close() - sys.exit(exit_code) + finally: + db.close() + sys.exit(0) diff --git a/orchestra/contrib/mailer/engine.py b/orchestra/contrib/mailer/engine.py index 8e8e2fe3..a9c0a2ed 100644 --- a/orchestra/contrib/mailer/engine.py +++ b/orchestra/contrib/mailer/engine.py @@ -4,6 +4,8 @@ from socket import error as SocketError from django.core.mail import get_connection from django.utils.encoding import smart_str +from orchestra.utils.sys import LockFile + from .models import Message @@ -29,23 +31,23 @@ def send_message(message, num=0, connection=None, bulk=100): def send_pending(bulk=100): - # TODO aquire lock - connection = None - num = 0 - for message in Message.objects.filter(state=Message.QUEUED).order_by('priority'): - send_message(message, num, connection, bulk) - num += 1 - from django.utils import timezone - from . import settings - from datetime import timedelta - from django.db.models import Q - - now = timezone.now() - qs = Q() - for retries, seconds in enumerate(settings.MAILER_DEFERE_SECONDS): - delta = timedelta(seconds=seconds) - qs = qs | Q(retries=retries, last_retry__lte=now-delta) - for message in Message.objects.filter(state=Message.DEFERRED).filter(qs).order_by('priority'): - send_message(message, num, connection, bulk) - if connection is not None: - connection.close() + with LockFile('/dev/shm/mailer.send_pending.lock'): + connection = None + num = 0 + for message in Message.objects.filter(state=Message.QUEUED).order_by('priority'): + send_message(message, num, connection, bulk) + num += 1 + from django.utils import timezone + from . import settings + from datetime import timedelta + from django.db.models import Q + + now = timezone.now() + qs = Q() + for retries, seconds in enumerate(settings.MAILER_DEFERE_SECONDS): + delta = timedelta(seconds=seconds) + qs = qs | Q(retries=retries, last_retry__lte=now-delta) + for message in Message.objects.filter(state=Message.DEFERRED).filter(qs).order_by('priority'): + send_message(message, num, connection, bulk) + if connection is not None: + connection.close() diff --git a/orchestra/contrib/resources/tasks.py b/orchestra/contrib/resources/tasks.py index 7286cbb8..157cd25f 100644 --- a/orchestra/contrib/resources/tasks.py +++ b/orchestra/contrib/resources/tasks.py @@ -1,52 +1,53 @@ from orchestra.contrib.orchestration import Operation from orchestra.contrib.tasks import task from orchestra.models.utils import get_model_field_path +from orchestra.utils.sys import LockFile from .backends import ServiceMonitor @task(name='resources.Monitor') def monitor(resource_id, ids=None, async=True): - from .models import ResourceData, Resource - - resource = Resource.objects.get(pk=resource_id) - resource_model = resource.content_type.model_class() - logs = [] - # Execute monitors - for monitor_name in resource.monitors: - backend = ServiceMonitor.get_backend(monitor_name) - model = backend.model_class() - kwargs = {} - if ids: - path = get_model_field_path(model, resource_model) - path = '%s__in' % ('__'.join(path) or 'id') - kwargs = { - path: ids - } - # Execute monitor - monitorings = [] + with LockFile('/dev/shm/resources.monitor.lock', expire=60*60): + from .models import ResourceData, Resource + resource = Resource.objects.get(pk=resource_id) + resource_model = resource.content_type.model_class() + logs = [] + # Execute monitors + for monitor_name in resource.monitors: + backend = ServiceMonitor.get_backend(monitor_name) + model = backend.model_class() + kwargs = {} + if ids: + path = get_model_field_path(model, resource_model) + path = '%s__in' % ('__'.join(path) or 'id') + kwargs = { + path: ids + } + # Execute monitor + monitorings = [] + for obj in model.objects.filter(**kwargs): + op = Operation(backend, obj, Operation.MONITOR) + monitorings.append(op) + # TODO async=True only when running with celery + # monitor.request.id + logs += Operation.execute(monitorings, async=async) + + kwargs = {'id__in': ids} if ids else {} + # Update used resources and trigger resource exceeded and revovery + triggers = [] + model = resource.content_type.model_class() for obj in model.objects.filter(**kwargs): - op = Operation(backend, obj, Operation.MONITOR) - monitorings.append(op) - # TODO async=True only when running with celery - # monitor.request.id - logs += Operation.execute(monitorings, async=async) - - kwargs = {'id__in': ids} if ids else {} - # Update used resources and trigger resource exceeded and revovery - triggers = [] - model = resource.content_type.model_class() - for obj in model.objects.filter(**kwargs): - data, __ = ResourceData.get_or_create(obj, resource) - data.update() - if not resource.disable_trigger: - a = data.used - b = data.allocated - if data.used > (data.allocated or 0): - op = Operation(backend, obj, Operation.EXCEEDED) - triggers.append(op) - elif data.used < (data.allocated or 0): - op = Operation(backend, obj, Operation.RECOVERY) - triggers.append(op) - Operation.execute(triggers) - return logs + data, __ = ResourceData.get_or_create(obj, resource) + data.update() + if not resource.disable_trigger: + a = data.used + b = data.allocated + if data.used > (data.allocated or 0): + op = Operation(backend, obj, Operation.EXCEEDED) + triggers.append(op) + elif data.used < (data.allocated or 0): + op = Operation(backend, obj, Operation.RECOVERY) + triggers.append(op) + Operation.execute(triggers) + return logs diff --git a/orchestra/contrib/webapps/backends/php.py b/orchestra/contrib/webapps/backends/php.py index 88b11c47..6e90b88f 100644 --- a/orchestra/contrib/webapps/backends/php.py +++ b/orchestra/contrib/webapps/backends/php.py @@ -1,5 +1,6 @@ import os import textwrap +from collections import OrderedDict from django.template import Template, Context from django.utils.translation import ugettext_lazy as _ @@ -115,19 +116,27 @@ class PHPBackend(WebAppServiceMixin, ServiceController): service php5-fpm reload fi # Coordinate apache restart with Apache2Backend - # FIXME race condition - locked=1 - state="$(grep -v 'PHPBackend' /dev/shm/restart.apache2)" || locked=0 - echo -n "$state" > /dev/shm/restart.apache2 + restart=0 + backend='PHPBackend' + mv /dev/shm/restart.apache2 /dev/shm/restart.apache2.locked || { + sleep 0.1 + mv /dev/shm/restart.apache2 /dev/shm/restart.apache2.locked + } + state="$(grep -v $backend /dev/shm/restart.apache2.locked)" || restart=1 + echo -n "$state" > /dev/shm/restart.apache2.locked if [[ $UPDATED_APACHE -eq 1 ]]; then - if [[ $locked == 0 ]]; then + if [[ $restart == 1 ]]; then service apache2 status && service apache2 reload || service apache2 start + rm /dev/shm/restart.apache2.locked else - echo "PHPBackend RESTART" >> /dev/shm/restart.apache2 + echo "$backend RESTART" >> /dev/shm/restart.apache2.locked + mv /dev/shm/restart.apache2.locked /dev/shm/restart.apache2 fi elif [[ "$state" =~ .*RESTART$ ]]; then - rm /dev/shm/restart.apache2 + rm /dev/shm/restart.apache2.locked service apache2 status && service apache2 reload || service apache2 start + else + mv /dev/shm/restart.apache2.locked /dev/shm/restart.apache2 fi """) ) @@ -135,7 +144,6 @@ class PHPBackend(WebAppServiceMixin, ServiceController): def get_options(self, webapp): kwargs = {} - print(webapp.data) if self.MERGE: kwargs = { 'webapp__account': webapp.account, @@ -195,10 +203,10 @@ class PHPBackend(WebAppServiceMixin, ServiceController): def get_fcgid_cmd_options(self, webapp, context): options = self.get_options(webapp) - maps = { - 'MaxProcesses': options.get('processes', None), - 'IOTimeout': options.get('timeout', None), - } + maps = OrderedDict( + MaxProcesses=options.get('processes', None), + IOTimeout=options.get('timeout', None), + ) cmd_options = [] for directive, value in maps.items(): if value: diff --git a/orchestra/contrib/websites/backends/apache.py b/orchestra/contrib/websites/backends/apache.py index fa469c59..b673d039 100644 --- a/orchestra/contrib/websites/backends/apache.py +++ b/orchestra/contrib/websites/backends/apache.py @@ -137,18 +137,28 @@ class Apache2Backend(ServiceController): def commit(self): """ reload Apache2 if necessary """ self.append(textwrap.dedent("""\ - locked=1 - state="$(grep -v 'Apache2Backend' /dev/shm/restart.apache2)" || locked=0 - echo -n "$state" > /dev/shm/restart.apache2 - if [[ $UPDATED == 1 ]]; then - if [[ $locked == 0 ]]; then + # Coordinate apache restart with Apache2Backend + restart=0 + backend='Apache2Backend' + mv /dev/shm/restart.apache2 /dev/shm/restart.apache2.locked || { + sleep 0.1 + mv /dev/shm/restart.apache2 /dev/shm/restart.apache2.locked + } + state="$(grep -v $backend /dev/shm/restart.apache2.locked)" || restart=1 + echo -n "$state" > /dev/shm/restart.apache2.locked + if [[ $UPDATED_APACHE -eq 1 ]]; then + if [[ $restart == 1 ]]; then service apache2 status && service apache2 reload || service apache2 start + rm /dev/shm/restart.apache2.locked else - echo "Apache2Backend RESTART" >> /dev/shm/restart.apache2 + echo "$backend RESTART" >> /dev/shm/restart.apache2.locked + mv /dev/shm/restart.apache2.locked /dev/shm/restart.apache2 fi elif [[ "$state" =~ .*RESTART$ ]]; then - rm /dev/shm/restart.apache2 + rm /dev/shm/restart.apache2.locked service apache2 status && service apache2 reload || service apache2 start + else + mv /dev/shm/restart.apache2.locked /dev/shm/restart.apache2 fi""") ) super(Apache2Backend, self).commit() diff --git a/orchestra/utils/sys.py b/orchestra/utils/sys.py index 3c5da067..c482cc65 100644 --- a/orchestra/utils/sys.py +++ b/orchestra/utils/sys.py @@ -156,3 +156,35 @@ def get_default_celeryd_username(): if user is None: raise CommandError("Can not find the default celeryd username") return user + + +class LockFile(object): + """ File-based lock mechanism used for preventing concurrency problems """ + def __init__(self, lockfile, expire=5*60, unlocked=False): + """ /dev/shm/ can be a good place for storing locks ;) """ + self.lockfile = lockfile + self.expire = expire + self.unlocked = unlocked + + def acquire(self): + if os.path.exists(self.lockfile): + lock_time = os.path.getmtime(self.lockfile) + # lock expires to avoid starvation + if time.time()-lock_time < self.expire: + return False + touch(self.lockfile) + return True + + def release(self): + os.remove(self.lockfile) + + def __enter__(self): + if not self.unlocked: + if not self.acquire(): + raise OperationLocked('%s lock file exists and its mtime is less ' + 'than %s seconds' % (self.lockfile, self.expire)) + return True + + def __exit__(self, type, value, traceback): + if not self.unlocked: + self.release()