Non-blocking beat

This commit is contained in:
Marc Aymerich 2015-05-06 10:51:12 +00:00
parent d9a7eefb07
commit 31ac035c27
7 changed files with 155 additions and 105 deletions

View file

@ -386,3 +386,6 @@ try: import uwsgi to know its running uwsgi
# don't block on beat, and --report periodic tasks
# Deprecate restart/start/stop services (do touch wsgi.py and fuck celery)
# high perf apache sync,

View file

@ -16,7 +16,7 @@ from datetime import datetime, timedelta
from celery.schedules import crontab_parser as CrontabParser
from orchestra.utils.sys import run, join
from orchestra.utils.sys import run, join, LockFile
class Setting(object):
@ -126,27 +126,21 @@ def fire_pending_messages(settings, db):
if __name__ == "__main__":
# TODO aquire lock
manage = sys.argv[1]
procs = []
settings = Setting(manage).get_settings()
db = DB(settings)
db.connect()
try:
if 'orchestra.contrib.tasks' in settings['INSTALLED_APPS']:
if settings.get('TASKS_BACKEND', 'thread') in ('thread', 'process'):
for proc in fire_pending_tasks(manage, db):
with LockFile('/dev/shm/beat.lock', expire=20):
manage = sys.argv[1]
procs = []
settings = Setting(manage).get_settings()
db = DB(settings)
db.connect()
try:
# Non-blocking loop, we need to finish this in time for the next minute.
if 'orchestra.contrib.tasks' in settings['INSTALLED_APPS']:
if settings.get('TASKS_BACKEND', 'thread') in ('thread', 'process'):
for proc in fire_pending_tasks(manage, db):
procs.append(proc)
if 'orchestra.contrib.mailer' in settings['INSTALLED_APPS']:
for proc in fire_pending_messages(settings, db):
procs.append(proc)
if 'orchestra.contrib.mailer' in settings['INSTALLED_APPS']:
for proc in fire_pending_messages(settings, db):
procs.append(proc)
exit_code = 0
for proc in procs:
result = join(proc)
sys.stdout.write(result.stdout.decode('utf8'))
sys.stderr.write(result.stderr.decode('utf8'))
if result.return_code != 0:
exit_code = result.return_code
finally:
db.close()
sys.exit(exit_code)
finally:
db.close()
sys.exit(0)

View file

@ -4,6 +4,8 @@ from socket import error as SocketError
from django.core.mail import get_connection
from django.utils.encoding import smart_str
from orchestra.utils.sys import LockFile
from .models import Message
@ -29,23 +31,23 @@ def send_message(message, num=0, connection=None, bulk=100):
def send_pending(bulk=100):
# TODO aquire lock
connection = None
num = 0
for message in Message.objects.filter(state=Message.QUEUED).order_by('priority'):
send_message(message, num, connection, bulk)
num += 1
from django.utils import timezone
from . import settings
from datetime import timedelta
from django.db.models import Q
with LockFile('/dev/shm/mailer.send_pending.lock'):
connection = None
num = 0
for message in Message.objects.filter(state=Message.QUEUED).order_by('priority'):
send_message(message, num, connection, bulk)
num += 1
from django.utils import timezone
from . import settings
from datetime import timedelta
from django.db.models import Q
now = timezone.now()
qs = Q()
for retries, seconds in enumerate(settings.MAILER_DEFERE_SECONDS):
delta = timedelta(seconds=seconds)
qs = qs | Q(retries=retries, last_retry__lte=now-delta)
for message in Message.objects.filter(state=Message.DEFERRED).filter(qs).order_by('priority'):
send_message(message, num, connection, bulk)
if connection is not None:
connection.close()
now = timezone.now()
qs = Q()
for retries, seconds in enumerate(settings.MAILER_DEFERE_SECONDS):
delta = timedelta(seconds=seconds)
qs = qs | Q(retries=retries, last_retry__lte=now-delta)
for message in Message.objects.filter(state=Message.DEFERRED).filter(qs).order_by('priority'):
send_message(message, num, connection, bulk)
if connection is not None:
connection.close()

View file

@ -1,52 +1,53 @@
from orchestra.contrib.orchestration import Operation
from orchestra.contrib.tasks import task
from orchestra.models.utils import get_model_field_path
from orchestra.utils.sys import LockFile
from .backends import ServiceMonitor
@task(name='resources.Monitor')
def monitor(resource_id, ids=None, async=True):
from .models import ResourceData, Resource
with LockFile('/dev/shm/resources.monitor.lock', expire=60*60):
from .models import ResourceData, Resource
resource = Resource.objects.get(pk=resource_id)
resource_model = resource.content_type.model_class()
logs = []
# Execute monitors
for monitor_name in resource.monitors:
backend = ServiceMonitor.get_backend(monitor_name)
model = backend.model_class()
kwargs = {}
if ids:
path = get_model_field_path(model, resource_model)
path = '%s__in' % ('__'.join(path) or 'id')
kwargs = {
path: ids
}
# Execute monitor
monitorings = []
for obj in model.objects.filter(**kwargs):
op = Operation(backend, obj, Operation.MONITOR)
monitorings.append(op)
# TODO async=True only when running with celery
# monitor.request.id
logs += Operation.execute(monitorings, async=async)
resource = Resource.objects.get(pk=resource_id)
resource_model = resource.content_type.model_class()
logs = []
# Execute monitors
for monitor_name in resource.monitors:
backend = ServiceMonitor.get_backend(monitor_name)
model = backend.model_class()
kwargs = {}
if ids:
path = get_model_field_path(model, resource_model)
path = '%s__in' % ('__'.join(path) or 'id')
kwargs = {
path: ids
}
# Execute monitor
monitorings = []
kwargs = {'id__in': ids} if ids else {}
# Update used resources and trigger resource exceeded and revovery
triggers = []
model = resource.content_type.model_class()
for obj in model.objects.filter(**kwargs):
op = Operation(backend, obj, Operation.MONITOR)
monitorings.append(op)
# TODO async=True only when running with celery
# monitor.request.id
logs += Operation.execute(monitorings, async=async)
kwargs = {'id__in': ids} if ids else {}
# Update used resources and trigger resource exceeded and revovery
triggers = []
model = resource.content_type.model_class()
for obj in model.objects.filter(**kwargs):
data, __ = ResourceData.get_or_create(obj, resource)
data.update()
if not resource.disable_trigger:
a = data.used
b = data.allocated
if data.used > (data.allocated or 0):
op = Operation(backend, obj, Operation.EXCEEDED)
triggers.append(op)
elif data.used < (data.allocated or 0):
op = Operation(backend, obj, Operation.RECOVERY)
triggers.append(op)
Operation.execute(triggers)
return logs
data, __ = ResourceData.get_or_create(obj, resource)
data.update()
if not resource.disable_trigger:
a = data.used
b = data.allocated
if data.used > (data.allocated or 0):
op = Operation(backend, obj, Operation.EXCEEDED)
triggers.append(op)
elif data.used < (data.allocated or 0):
op = Operation(backend, obj, Operation.RECOVERY)
triggers.append(op)
Operation.execute(triggers)
return logs

View file

@ -1,5 +1,6 @@
import os
import textwrap
from collections import OrderedDict
from django.template import Template, Context
from django.utils.translation import ugettext_lazy as _
@ -115,19 +116,27 @@ class PHPBackend(WebAppServiceMixin, ServiceController):
service php5-fpm reload
fi
# Coordinate apache restart with Apache2Backend
# FIXME race condition
locked=1
state="$(grep -v 'PHPBackend' /dev/shm/restart.apache2)" || locked=0
echo -n "$state" > /dev/shm/restart.apache2
restart=0
backend='PHPBackend'
mv /dev/shm/restart.apache2 /dev/shm/restart.apache2.locked || {
sleep 0.1
mv /dev/shm/restart.apache2 /dev/shm/restart.apache2.locked
}
state="$(grep -v $backend /dev/shm/restart.apache2.locked)" || restart=1
echo -n "$state" > /dev/shm/restart.apache2.locked
if [[ $UPDATED_APACHE -eq 1 ]]; then
if [[ $locked == 0 ]]; then
if [[ $restart == 1 ]]; then
service apache2 status && service apache2 reload || service apache2 start
rm /dev/shm/restart.apache2.locked
else
echo "PHPBackend RESTART" >> /dev/shm/restart.apache2
echo "$backend RESTART" >> /dev/shm/restart.apache2.locked
mv /dev/shm/restart.apache2.locked /dev/shm/restart.apache2
fi
elif [[ "$state" =~ .*RESTART$ ]]; then
rm /dev/shm/restart.apache2
rm /dev/shm/restart.apache2.locked
service apache2 status && service apache2 reload || service apache2 start
else
mv /dev/shm/restart.apache2.locked /dev/shm/restart.apache2
fi
""")
)
@ -135,7 +144,6 @@ class PHPBackend(WebAppServiceMixin, ServiceController):
def get_options(self, webapp):
kwargs = {}
print(webapp.data)
if self.MERGE:
kwargs = {
'webapp__account': webapp.account,
@ -195,10 +203,10 @@ class PHPBackend(WebAppServiceMixin, ServiceController):
def get_fcgid_cmd_options(self, webapp, context):
options = self.get_options(webapp)
maps = {
'MaxProcesses': options.get('processes', None),
'IOTimeout': options.get('timeout', None),
}
maps = OrderedDict(
MaxProcesses=options.get('processes', None),
IOTimeout=options.get('timeout', None),
)
cmd_options = []
for directive, value in maps.items():
if value:

View file

@ -137,18 +137,28 @@ class Apache2Backend(ServiceController):
def commit(self):
""" reload Apache2 if necessary """
self.append(textwrap.dedent("""\
locked=1
state="$(grep -v 'Apache2Backend' /dev/shm/restart.apache2)" || locked=0
echo -n "$state" > /dev/shm/restart.apache2
if [[ $UPDATED == 1 ]]; then
if [[ $locked == 0 ]]; then
# Coordinate apache restart with Apache2Backend
restart=0
backend='Apache2Backend'
mv /dev/shm/restart.apache2 /dev/shm/restart.apache2.locked || {
sleep 0.1
mv /dev/shm/restart.apache2 /dev/shm/restart.apache2.locked
}
state="$(grep -v $backend /dev/shm/restart.apache2.locked)" || restart=1
echo -n "$state" > /dev/shm/restart.apache2.locked
if [[ $UPDATED_APACHE -eq 1 ]]; then
if [[ $restart == 1 ]]; then
service apache2 status && service apache2 reload || service apache2 start
rm /dev/shm/restart.apache2.locked
else
echo "Apache2Backend RESTART" >> /dev/shm/restart.apache2
echo "$backend RESTART" >> /dev/shm/restart.apache2.locked
mv /dev/shm/restart.apache2.locked /dev/shm/restart.apache2
fi
elif [[ "$state" =~ .*RESTART$ ]]; then
rm /dev/shm/restart.apache2
rm /dev/shm/restart.apache2.locked
service apache2 status && service apache2 reload || service apache2 start
else
mv /dev/shm/restart.apache2.locked /dev/shm/restart.apache2
fi""")
)
super(Apache2Backend, self).commit()

View file

@ -156,3 +156,35 @@ def get_default_celeryd_username():
if user is None:
raise CommandError("Can not find the default celeryd username")
return user
class LockFile(object):
""" File-based lock mechanism used for preventing concurrency problems """
def __init__(self, lockfile, expire=5*60, unlocked=False):
""" /dev/shm/ can be a good place for storing locks ;) """
self.lockfile = lockfile
self.expire = expire
self.unlocked = unlocked
def acquire(self):
if os.path.exists(self.lockfile):
lock_time = os.path.getmtime(self.lockfile)
# lock expires to avoid starvation
if time.time()-lock_time < self.expire:
return False
touch(self.lockfile)
return True
def release(self):
os.remove(self.lockfile)
def __enter__(self):
if not self.unlocked:
if not self.acquire():
raise OperationLocked('%s lock file exists and its mtime is less '
'than %s seconds' % (self.lockfile, self.expire))
return True
def __exit__(self, type, value, traceback):
if not self.unlocked:
self.release()