Fixes on resource monitoring

This commit is contained in:
Marc 2014-07-11 21:09:17 +00:00
parent c54c084dff
commit 5344e732fc
10 changed files with 373 additions and 104 deletions

View file

@ -51,6 +51,9 @@ Remember that, as always with QuerySets, any subsequent chained methods which im
* create custom field that returns backend python objects * create custom field that returns backend python objects
* Timezone awareness on monitoring system (reading server-side logs with different TZ than orchestra) maybe a settings value? (use UTC internally, timezone.localtime() when interacting with servers) * Timezone awareness on monitoring system (reading server-side logs with different TZ than orchestra) maybe a settings value? (use UTC internally, timezone.localtime() when interacting with servers)
* Resource metric: KB MB B? * Resource metric: KB MB B? RESOURCE UNIT!! forms and serializers
* EMAIL backend operations which contain stderr messages (because under certain failures status code is still 0) * EMAIL backend operations which contain stderr messages (because under certain failures status code is still 0)
* Settings dictionary like DRF2 in order to better override large settings like WEBSITES_APPLICATIONS.etc

View file

@ -10,9 +10,9 @@ from .helpers import send_report
def as_task(execute): def as_task(execute):
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
# with db.transaction.commit_manually(): with db.transaction.commit_manually():
log = execute(*args, **kwargs) log = execute(*args, **kwargs)
# db.transaction.commit() db.transaction.commit()
if log.state != log.SUCCESS: if log.state != log.SUCCESS:
send_report(execute, args, log) send_report(execute, args, log)
return log return log

View file

@ -63,14 +63,17 @@ class ResourceAdmin(ExtendedModelAdmin):
class ResourceDataAdmin(admin.ModelAdmin): class ResourceDataAdmin(admin.ModelAdmin):
list_display = ('id', 'resource', 'used', 'allocated', 'last_update',) # TODO content_object list_display = ('id', 'resource', 'used', 'allocated', 'last_update', 'content_type') # TODO content_object
list_filter = ('resource',) list_filter = ('resource',)
class MonitorDataAdmin(admin.ModelAdmin): class MonitorDataAdmin(admin.ModelAdmin):
list_display = ('id', 'monitor', 'date', 'value') # TODO content_object list_display = ('id', 'monitor', 'date', 'value', 'ct', 'object_id') # TODO content_object
list_filter = ('monitor',) list_filter = ('monitor',)
def ct(self, i):
return i.content_type_id
admin.site.register(Resource, ResourceAdmin) admin.site.register(Resource, ResourceAdmin)
admin.site.register(ResourceData, ResourceDataAdmin) admin.site.register(ResourceData, ResourceDataAdmin)

View file

@ -12,6 +12,7 @@ class ServiceMonitor(ServiceBackend):
DISK = 'disk' DISK = 'disk'
MEMORY = 'memory' MEMORY = 'memory'
CPU = 'cpu' CPU = 'cpu'
# TODO UNITS
actions = ('monitor', 'resource_exceeded', 'resource_recovery') actions = ('monitor', 'resource_exceeded', 'resource_recovery')

View file

@ -1,13 +1,16 @@
import datetime import datetime
from django.db import models from django.db import models
from django.db.models.loading import get_model
from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation from django.contrib.contenttypes.fields import GenericForeignKey, GenericRelation
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
from django.core import validators from django.core import validators
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from djcelery.models import PeriodicTask, CrontabSchedule from djcelery.models import PeriodicTask, CrontabSchedule
from orchestra.models.fields import MultiSelectField from orchestra.models.fields import MultiSelectField
from orchestra.models.utils import get_model_field_path
from orchestra.utils.apps import autodiscover from orchestra.utils.apps import autodiscover
from .backends import ServiceMonitor from .backends import ServiceMonitor
@ -119,11 +122,13 @@ class ResourceData(models.Model):
@classmethod @classmethod
def get_or_create(cls, obj, resource): def get_or_create(cls, obj, resource):
ct = ContentType.objects.get_for_model(type(obj))
try: try:
return cls.objects.get(content_object=obj, resource=resource) return cls.objects.get(content_type=ct, object_id=obj.pk,
except cls.DoesNotExists: resource=resource)
except cls.DoesNotExist:
return cls.objects.create(content_object=obj, resource=resource, return cls.objects.create(content_object=obj, resource=resource,
allocated=resource.defalt_allocation) allocated=resource.default_allocation)
def get_used(self): def get_used(self):
resource = self.resource resource = self.resource
@ -131,8 +136,19 @@ class ResourceData(models.Model):
result = 0 result = 0
has_result = False has_result = False
for monitor in resource.monitors: for monitor in resource.monitors:
resource_model = self.content_type.model_class()
monitor_model = get_model(ServiceMonitor.get_backend(monitor).model)
if resource_model == monitor_model:
dataset = MonitorData.objects.filter(monitor=monitor, dataset = MonitorData.objects.filter(monitor=monitor,
content_type=self.content_type, object_id=self.object_id) content_type=self.content_type_id, object_id=self.object_id)
else:
path = get_model_field_path(monitor_model, resource_model)
fields = '__'.join(path)
objects = monitor_model.objects.filter(**{fields: self.object_id})
pks = objects.values_list('id', flat=True)
ct = ContentType.objects.get_for_model(monitor_model)
dataset = MonitorData.objects.filter(monitor=monitor,
content_type=ct, object_id__in=pks)
if resource.period == resource.MONTHLY_AVG: if resource.period == resource.MONTHLY_AVG:
try: try:
last = dataset.latest() last = dataset.latest()
@ -148,12 +164,13 @@ class ResourceData(models.Model):
slot = (previous-data.date).total_seconds() slot = (previous-data.date).total_seconds()
result += data.value * slot/total result += data.value * slot/total
elif resource.period == resource.MONTHLY_SUM: elif resource.period == resource.MONTHLY_SUM:
data = dataset.filter(date__year=today.year, data = dataset.filter(date__year=today.year, date__month=today.month)
date__month=today.month) # FIXME Aggregation of 0s returns None! django bug?
value = data.aggregate(models.Sum('value'))['value__sum'] # value = data.aggregate(models.Sum('value'))['value__sum']
if value: values = data.values_list('value', flat=True)
if values:
has_result = True has_result = True
result += value result += sum(values)
elif resource.period == resource.LAST: elif resource.period == resource.LAST:
try: try:
result += dataset.latest().value result += dataset.latest().value

View file

@ -1,9 +1,11 @@
from celery import shared_task from celery import shared_task
from django.db.models.loading import get_model
from django.utils import timezone from django.utils import timezone
from orchestra.apps.orchestration.models import BackendOperation as Operation from orchestra.apps.orchestration.models import BackendOperation as Operation
from .backends import ServiceMonitor from .backends import ServiceMonitor
from .models import MonitorData from .models import ResourceData, Resource
@shared_task(name='resources.Monitor') @shared_task(name='resources.Monitor')
@ -13,7 +15,7 @@ def monitor(resource_id):
# Execute monitors # Execute monitors
for monitor_name in resource.monitors: for monitor_name in resource.monitors:
backend = ServiceMonitor.get_backend(monitor_name) backend = ServiceMonitor.get_backend(monitor_name)
model = backend.model model = get_model(*backend.model.split('.'))
operations = [] operations = []
# Execute monitor # Execute monitor
for obj in model.objects.all(): for obj in model.objects.all():
@ -22,18 +24,18 @@ def monitor(resource_id):
# Update used resources and trigger resource exceeded and revovery # Update used resources and trigger resource exceeded and revovery
operations = [] operations = []
model = resource.model model = resource.content_type.model_class()
for obj in model.objects.all(): for obj in model.objects.all():
data = MonitorData.get_or_create(obj, resource) data = ResourceData.get_or_create(obj, resource)
current = data.get_used() current = data.get_used()
if not resource.disable_trigger: if not resource.disable_trigger:
if data.used < data.allocated and current > data.allocated: if data.used < data.allocated and current > data.allocated:
op = Operation.create(backend, data.content_object, Operation.EXCEED) op = Operation.create(backend, obj, Operation.EXCEED)
operations.append(op) operations.append(op)
elif res.used > res.allocated and current < res.allocated: elif data.used > data.allocated and current < data.allocated:
op = Operation.create(backend, data.content_object, Operation.RECOVERY) op = Operation.create(backend, obj, Operation.RECOVERY)
operation.append(op) operation.append(op)
data.used = current data.used = current or 0
data.las_update = timezone.now() data.last_update = timezone.now()
data.save() data.save()
Operation.execute(operations) Operation.execute(operations)

View file

@ -184,10 +184,11 @@ class Apache2Traffic(ServiceMonitor):
def monitor(self, site): def monitor(self, site):
context = self.get_context(site) context = self.get_context(site)
self.append(""" self.append("""{
awk 'BEGIN { awk 'BEGIN {
ini = "%(last_date)s" ini = "%(last_date)s"
end = "%(current_date)s" end = "%(current_date)s"
sum = 0
months["Jan"] = "01"; months["Jan"] = "01";
months["Feb"] = "02"; months["Feb"] = "02";
@ -212,16 +213,10 @@ class Apache2Traffic(ServiceMonitor):
second = substr(date, 19, 2) second = substr(date, 19, 2)
line_date = year month day hour minute second line_date = year month day hour minute second
if ( line_date > ini && line_date < end) if ( line_date > ini && line_date < end)
if ( $10 == "" ) sum += $NF
sum += $9
else
sum += $10
} END { } END {
if ( sum )
print sum print sum
else }' %(log_file)s || echo 0; } | xargs echo %(object_id)s """ % context)
print 0
}' %(log_file)s | xargs echo %(object_id)s """ % context)
def get_context(self, site): def get_context(self, site):
return { return {

View file

@ -10,8 +10,8 @@
### BEGIN INIT INFO ### BEGIN INIT INFO
# Provides: celerybeat # Provides: celerybeat
# Required-Start: $network $local_fs $remote_fs postgresql celeryd # Required-Start: $network $local_fs $remote_fs
# Required-Stop: $network $local_fs $remote_fs postgresql celeryd # Required-Stop: $network $local_fs $remote_fs
# Default-Start: 2 3 4 5 # Default-Start: 2 3 4 5
# Default-Stop: 0 1 6 # Default-Stop: 0 1 6
# Short-Description: celery periodic task scheduler # Short-Description: celery periodic task scheduler
@ -20,25 +20,104 @@
# Cannot use set -e/bash -e since the kill -0 command will abort # Cannot use set -e/bash -e since the kill -0 command will abort
# abnormally in the absence of a valid process ID. # abnormally in the absence of a valid process ID.
#set -e #set -e
VERSION=10.0
echo "celery init v${VERSION}."
DEFAULT_PID_FILE="/var/run/celery/beat.pid" if [ $(id -u) -ne 0 ]; then
DEFAULT_LOG_FILE="/var/log/celery/beat.log" echo "Error: This program can only be used by the root user."
DEFAULT_LOG_LEVEL="INFO" echo " Unpriviliged users must use 'celery beat --detach'"
DEFAULT_CELERYBEAT="celerybeat" exit 1
fi
# May be a runlevel symlink (e.g. S02celeryd)
if [ -L "$0" ]; then
SCRIPT_FILE=$(readlink "$0")
else
SCRIPT_FILE="$0"
fi
SCRIPT_NAME="$(basename "$SCRIPT_FILE")"
# /etc/init.d/celerybeat: start and stop the celery periodic task scheduler daemon. # /etc/init.d/celerybeat: start and stop the celery periodic task scheduler daemon.
# Make sure executable configuration script is owned by root
_config_sanity() {
local path="$1"
local owner=$(ls -ld "$path" | awk '{print $3}')
local iwgrp=$(ls -ld "$path" | cut -b 6)
local iwoth=$(ls -ld "$path" | cut -b 9)
if [ "$(id -u $owner)" != "0" ]; then
echo "Error: Config script '$path' must be owned by root!"
echo
echo "Resolution:"
echo "Review the file carefully and make sure it has not been "
echo "modified with mailicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change ownership of the script:"
echo " $ sudo chown root '$path'"
exit 1
fi
if [ "$iwoth" != "-" ]; then # S_IWOTH
echo "Error: Config script '$path' cannot be writable by others!"
echo
echo "Resolution:"
echo "Review the file carefully and make sure it has not been "
echo "modified with malicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change the scripts permissions:"
echo " $ sudo chmod 640 '$path'"
exit 1
fi
if [ "$iwgrp" != "-" ]; then # S_IWGRP
echo "Error: Config script '$path' cannot be writable by group!"
echo
echo "Resolution:"
echo "Review the file carefully and make sure it has not been "
echo "modified with malicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change the scripts permissions:"
echo " $ sudo chmod 640 '$path'"
exit 1
fi
}
scripts=""
if test -f /etc/default/celeryd; then if test -f /etc/default/celeryd; then
scripts="/etc/default/celeryd"
_config_sanity /etc/default/celeryd
. /etc/default/celeryd . /etc/default/celeryd
fi fi
if test -f /etc/default/celerybeat; then EXTRA_CONFIG="/etc/default/${SCRIPT_NAME}"
. /etc/default/celerybeat if test -f "$EXTRA_CONFIG"; then
scripts="$scripts, $EXTRA_CONFIG"
_config_sanity "$EXTRA_CONFIG"
. "$EXTRA_CONFIG"
fi fi
echo "Using configuration: $scripts"
CELERY_BIN=${CELERY_BIN:-"celery"}
DEFAULT_USER="celery"
DEFAULT_PID_FILE="/var/run/celery/beat.pid"
DEFAULT_LOG_FILE="/var/log/celery/beat.log"
DEFAULT_LOG_LEVEL="INFO"
DEFAULT_CELERYBEAT="$CELERY_BIN beat"
CELERYBEAT=${CELERYBEAT:-$DEFAULT_CELERYBEAT} CELERYBEAT=${CELERYBEAT:-$DEFAULT_CELERYBEAT}
CELERYBEAT_LOG_LEVEL=${CELERYBEAT_LOG_LEVEL:-${CELERYBEAT_LOGLEVEL:-$DEFAULT_LOG_LEVEL}} CELERYBEAT_LOG_LEVEL=${CELERYBEAT_LOG_LEVEL:-${CELERYBEAT_LOGLEVEL:-$DEFAULT_LOG_LEVEL}}
# Sets --app argument for CELERY_BIN
CELERY_APP_ARG=""
if [ ! -z "$CELERY_APP" ]; then
CELERY_APP_ARG="--app=$CELERY_APP"
fi
CELERYBEAT_USER=${CELERYBEAT_USER:-${CELERYD_USER:-$DEFAULT_USER}}
# Set CELERY_CREATE_DIRS to always create log/pid dirs. # Set CELERY_CREATE_DIRS to always create log/pid dirs.
CELERY_CREATE_DIRS=${CELERY_CREATE_DIRS:-0} CELERY_CREATE_DIRS=${CELERY_CREATE_DIRS:-0}
CELERY_CREATE_RUNDIR=$CELERY_CREATE_DIRS CELERY_CREATE_RUNDIR=$CELERY_CREATE_DIRS
@ -64,16 +143,10 @@ CELERYBEAT_LOG_DIR=`dirname $CELERYBEAT_LOG_FILE`
CELERYBEAT_PID_DIR=`dirname $CELERYBEAT_PID_FILE` CELERYBEAT_PID_DIR=`dirname $CELERYBEAT_PID_FILE`
# Extra start-stop-daemon options, like user/group. # Extra start-stop-daemon options, like user/group.
if [ -n "$CELERYBEAT_USER" ]; then
DAEMON_OPTS="$DAEMON_OPTS --uid $CELERYBEAT_USER"
fi
if [ -n "$CELERYBEAT_GROUP" ]; then
DAEMON_OPTS="$DAEMON_OPTS --gid $CELERYBEAT_GROUP"
fi
CELERYBEAT_CHDIR=${CELERYBEAT_CHDIR:-$CELERYD_CHDIR} CELERYBEAT_CHDIR=${CELERYBEAT_CHDIR:-$CELERYD_CHDIR}
if [ -n "$CELERYBEAT_CHDIR" ]; then if [ -n "$CELERYBEAT_CHDIR" ]; then
DAEMON_OPTS="$DAEMON_OPTS --workdir $CELERYBEAT_CHDIR" DAEMON_OPTS="$DAEMON_OPTS --workdir=$CELERYBEAT_CHDIR"
fi fi
@ -155,7 +228,7 @@ wait_pid () {
stop_beat () { stop_beat () {
echo -n "Stopping celerybeat... " echo -n "Stopping ${SCRIPT_NAME}... "
if [ -f "$CELERYBEAT_PID_FILE" ]; then if [ -f "$CELERYBEAT_PID_FILE" ]; then
wait_pid $(cat "$CELERYBEAT_PID_FILE") wait_pid $(cat "$CELERYBEAT_PID_FILE")
else else
@ -163,12 +236,13 @@ stop_beat () {
fi fi
} }
_chuid () {
su "$CELERYBEAT_USER" -c "$CELERYBEAT $*"
}
start_beat () { start_beat () {
echo "Starting celerybeat..." echo "Starting ${SCRIPT_NAME}..."
if [ -n "$VIRTUALENV" ]; then _chuid $CELERY_APP_ARG $CELERYBEAT_OPTS $DAEMON_OPTS --detach \
source $VIRTUALENV/bin/activate
fi
$CELERYBEAT $CELERYBEAT_OPTS $DAEMON_OPTS --detach \
--pidfile="$CELERYBEAT_PID_FILE" --pidfile="$CELERYBEAT_PID_FILE"
} }
@ -203,10 +277,9 @@ case "$1" in
check_paths check_paths
;; ;;
*) *)
echo "Usage: /etc/init.d/celerybeat {start|stop|restart|create-paths}" echo "Usage: /etc/init.d/${SCRIPT_NAME} {start|stop|restart|create-paths}"
exit 64 # EX_USAGE exit 64 # EX_USAGE
;; ;;
esac esac
exit 0 exit 0

View file

@ -11,25 +11,106 @@
### BEGIN INIT INFO ### BEGIN INIT INFO
# Provides: celeryd # Provides: celeryd
# Required-Start: $network $local_fs $remote_fs postgresql celeryev rabbitmq-server # Required-Start: $network $local_fs $remote_fs
# Required-Stop: $network $local_fs $remote_fs postgresql celeryev rabbitmq-server # Required-Stop: $network $local_fs $remote_fs
# Default-Start: 2 3 4 5 # Default-Start: 2 3 4 5
# Default-Stop: 0 1 6 # Default-Stop: 0 1 6
# Short-Description: celery task worker daemon # Short-Description: celery task worker daemon
### END INIT INFO ### END INIT INFO
#
#
# To implement separate init scripts, copy this script and give it a different
# name:
# I.e., if my new application, "little-worker" needs an init, I
# should just use:
#
# cp /etc/init.d/celeryd /etc/init.d/little-worker
#
# You can then configure this by manipulating /etc/default/little-worker.
#
VERSION=10.0
echo "celery init v${VERSION}."
if [ $(id -u) -ne 0 ]; then
echo "Error: This program can only be used by the root user."
echo " Unprivileged users must use the 'celery multi' utility, "
echo " or 'celery worker --detach'."
exit 1
fi
# some commands work asyncronously, so we'll wait this many seconds
SLEEP_SECONDS=5
# Can be a runlevel symlink (e.g. S02celeryd)
if [ -L "$0" ]; then
SCRIPT_FILE=$(readlink "$0")
else
SCRIPT_FILE="$0"
fi
SCRIPT_NAME="$(basename "$SCRIPT_FILE")"
DEFAULT_USER="celery"
DEFAULT_PID_FILE="/var/run/celery/%n.pid" DEFAULT_PID_FILE="/var/run/celery/%n.pid"
DEFAULT_LOG_FILE="/var/log/celery/%n.log" DEFAULT_LOG_FILE="/var/log/celery/%n%I.log"
DEFAULT_LOG_LEVEL="INFO" DEFAULT_LOG_LEVEL="INFO"
DEFAULT_NODES="celery" DEFAULT_NODES="celery"
DEFAULT_CELERYD="-m celery.bin.celeryd_detach" DEFAULT_CELERYD="-m celery worker --detach"
CELERY_DEFAULTS=${CELERY_DEFAULTS:-"/etc/default/celeryd"} CELERY_DEFAULTS=${CELERY_DEFAULTS:-"/etc/default/${SCRIPT_NAME}"}
test -f "$CELERY_DEFAULTS" && . "$CELERY_DEFAULTS" # Make sure executable configuration script is owned by root
_config_sanity() {
local path="$1"
local owner=$(ls -ld "$path" | awk '{print $3}')
local iwgrp=$(ls -ld "$path" | cut -b 6)
local iwoth=$(ls -ld "$path" | cut -b 9)
if [ "$(id -u $owner)" != "0" ]; then
echo "Error: Config script '$path' must be owned by root!"
echo
echo "Resolution:"
echo "Review the file carefully and make sure it has not been "
echo "modified with mailicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change ownership of the script:"
echo " $ sudo chown root '$path'"
exit 1
fi
if [ "$iwoth" != "-" ]; then # S_IWOTH
echo "Error: Config script '$path' cannot be writable by others!"
echo
echo "Resolution:"
echo "Review the file carefully and make sure it has not been "
echo "modified with malicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change the scripts permissions:"
echo " $ sudo chmod 640 '$path'"
exit 1
fi
if [ "$iwgrp" != "-" ]; then # S_IWGRP
echo "Error: Config script '$path' cannot be writable by group!"
echo
echo "Resolution:"
echo "Review the file carefully and make sure it has not been "
echo "modified with malicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change the scripts permissions:"
echo " $ sudo chmod 640 '$path'"
exit 1
fi
}
if [ -f "$CELERY_DEFAULTS" ]; then
_config_sanity "$CELERY_DEFAULTS"
echo "Using config script: $CELERY_DEFAULTS"
. "$CELERY_DEFAULTS"
fi
# Sets --app argument for CELERY_BIN
CELERY_APP_ARG=""
if [ ! -z "$CELERY_APP" ]; then
CELERY_APP_ARG="--app=$CELERY_APP"
fi
CELERYD_USER=${CELERYD_USER:-$DEFAULT_USER}
# Set CELERY_CREATE_DIRS to always create log/pid dirs. # Set CELERY_CREATE_DIRS to always create log/pid dirs.
CELERY_CREATE_DIRS=${CELERY_CREATE_DIRS:-0} CELERY_CREATE_DIRS=${CELERY_CREATE_DIRS:-0}
@ -45,8 +126,8 @@ if [ -z "$CELERYD_LOG_FILE" ]; then
fi fi
CELERYD_LOG_LEVEL=${CELERYD_LOG_LEVEL:-${CELERYD_LOGLEVEL:-$DEFAULT_LOG_LEVEL}} CELERYD_LOG_LEVEL=${CELERYD_LOG_LEVEL:-${CELERYD_LOGLEVEL:-$DEFAULT_LOG_LEVEL}}
CELERYD_MULTI=${CELERYD_MULTI:-"celeryd-multi"} CELERY_BIN=${CELERY_BIN:-"celery"}
CELERYD=${CELERYD:-$DEFAULT_CELERYD} CELERYD_MULTI=${CELERYD_MULTI:-"$CELERY_BIN multi"}
CELERYD_NODES=${CELERYD_NODES:-$DEFAULT_NODES} CELERYD_NODES=${CELERYD_NODES:-$DEFAULT_NODES}
export CELERY_LOADER export CELERY_LOADER
@ -59,13 +140,6 @@ CELERYD_LOG_DIR=`dirname $CELERYD_LOG_FILE`
CELERYD_PID_DIR=`dirname $CELERYD_PID_FILE` CELERYD_PID_DIR=`dirname $CELERYD_PID_FILE`
# Extra start-stop-daemon options, like user/group. # Extra start-stop-daemon options, like user/group.
if [ -n "$CELERYD_USER" ]; then
DAEMON_OPTS="$DAEMON_OPTS --uid=$CELERYD_USER"
fi
if [ -n "$CELERYD_GROUP" ]; then
DAEMON_OPTS="$DAEMON_OPTS --gid=$CELERYD_GROUP"
fi
if [ -n "$CELERYD_CHDIR" ]; then if [ -n "$CELERYD_CHDIR" ]; then
DAEMON_OPTS="$DAEMON_OPTS --workdir=$CELERYD_CHDIR" DAEMON_OPTS="$DAEMON_OPTS --workdir=$CELERYD_CHDIR"
fi fi
@ -125,58 +199,119 @@ create_paths() {
export PATH="${PATH:+$PATH:}/usr/sbin:/sbin" export PATH="${PATH:+$PATH:}/usr/sbin:/sbin"
_get_pid_files() { _get_pids() {
[ ! -d "$CELERYD_PID_DIR" ] && return found_pids=0
echo `ls -1 "$CELERYD_PID_DIR"/*.pid 2> /dev/null` my_exitcode=0
for pid_file in "$CELERYD_PID_DIR"/*.pid; do
local pid=`cat "$pid_file"`
local cleaned_pid=`echo "$pid" | sed -e 's/[^0-9]//g'`
if [ -z "$pid" ] || [ "$cleaned_pid" != "$pid" ]; then
echo "bad pid file ($pid_file)"
one_failed=true
my_exitcode=1
else
found_pids=1
echo "$pid"
fi
if [ $found_pids -eq 0 ]; then
echo "${SCRIPT_NAME}: All nodes down"
exit $my_exitcode
fi
done
} }
stop_workers () {
$CELERYD_MULTI stopwait $CELERYD_NODES --pidfile="$CELERYD_PID_FILE" _chuid () {
sleep $SLEEP_SECONDS su "$CELERYD_USER" -c "$CELERYD_MULTI $*"
} }
start_workers () { start_workers () {
$CELERYD_MULTI start $CELERYD_NODES $DAEMON_OPTS \ if [ ! -z "$CELERYD_ULIMIT" ]; then
ulimit $CELERYD_ULIMIT
fi
_chuid $* start $CELERYD_NODES $DAEMON_OPTS \
--pidfile="$CELERYD_PID_FILE" \ --pidfile="$CELERYD_PID_FILE" \
--logfile="$CELERYD_LOG_FILE" \ --logfile="$CELERYD_LOG_FILE" \
--loglevel="$CELERYD_LOG_LEVEL" \ --loglevel="$CELERYD_LOG_LEVEL" \
--cmd="$CELERYD" \ $CELERY_APP_ARG \
$CELERYD_OPTS $CELERYD_OPTS
sleep $SLEEP_SECONDS }
dryrun () {
(C_FAKEFORK=1 start_workers --verbose)
}
stop_workers () {
_chuid stopwait $CELERYD_NODES --pidfile="$CELERYD_PID_FILE"
} }
restart_workers () { restart_workers () {
$CELERYD_MULTI restart $CELERYD_NODES $DAEMON_OPTS \ _chuid restart $CELERYD_NODES $DAEMON_OPTS \
--pidfile="$CELERYD_PID_FILE" \ --pidfile="$CELERYD_PID_FILE" \
--logfile="$CELERYD_LOG_FILE" \ --logfile="$CELERYD_LOG_FILE" \
--loglevel="$CELERYD_LOG_LEVEL" \ --loglevel="$CELERYD_LOG_LEVEL" \
--cmd="$CELERYD" \ $CELERY_APP_ARG \
$CELERYD_OPTS $CELERYD_OPTS
sleep $SLEEP_SECONDS
} }
kill_workers() {
_chuid kill $CELERYD_NODES --pidfile="$CELERYD_PID_FILE"
}
restart_workers_graceful () {
local worker_pids=
worker_pids=`_get_pids`
[ "$one_failed" ] && exit 1
for worker_pid in $worker_pids; do
local failed=
kill -HUP $worker_pid 2> /dev/null || failed=true
if [ "$failed" ]; then
echo "${SCRIPT_NAME} worker (pid $worker_pid) could not be restarted"
one_failed=true
else
echo "${SCRIPT_NAME} worker (pid $worker_pid) received SIGHUP"
fi
done
[ "$one_failed" ] && exit 1 || exit 0
}
check_status () { check_status () {
local pid_files= my_exitcode=0
pid_files=`_get_pid_files` found_pids=0
[ -z "$pid_files" ] && echo "celeryd not running (no pidfile)" && exit 1
local one_failed= local one_failed=
for pid_file in $pid_files; do for pid_file in "$CELERYD_PID_DIR"/*.pid; do
if [ ! -r $pid_file ]; then
echo "${SCRIPT_NAME} is stopped: no pids were found"
one_failed=true
break
fi
local node=`basename "$pid_file" .pid` local node=`basename "$pid_file" .pid`
local pid=`cat "$pid_file"` local pid=`cat "$pid_file"`
local cleaned_pid=`echo "$pid" | sed -e 's/[^0-9]//g'` local cleaned_pid=`echo "$pid" | sed -e 's/[^0-9]//g'`
if [ -z "$pid" ] || [ "$cleaned_pid" != "$pid" ]; then if [ -z "$pid" ] || [ "$cleaned_pid" != "$pid" ]; then
echo "bad pid file ($pid_file)" echo "bad pid file ($pid_file)"
one_failed=true
else else
local failed= local failed=
kill -0 $pid 2> /dev/null || failed=true kill -0 $pid 2> /dev/null || failed=true
if [ "$failed" ]; then if [ "$failed" ]; then
echo "celeryd (node $node) (pid $pid) is stopped, but pid file exists!" echo "${SCRIPT_NAME} (node $node) (pid $pid) is stopped, but pid file exists!"
one_failed=true one_failed=true
else else
echo "celeryd (node $node) (pid $pid) is running..." echo "${SCRIPT_NAME} (node $node) (pid $pid) is running..."
fi fi
fi fi
done done
@ -211,24 +346,42 @@ case "$1" in
check_paths check_paths
restart_workers restart_workers
;; ;;
graceful)
check_dev_null
restart_workers_graceful
;;
kill)
check_dev_null
kill_workers
;;
dryrun)
check_dev_null
dryrun
;;
try-restart) try-restart)
check_dev_null check_dev_null
check_paths check_paths
restart_workers restart_workers
;; ;;
create-paths) create-paths)
check_dev_null check_dev_null
create_paths create_paths
;; ;;
check-paths) check-paths)
check_dev_null check_dev_null
check_paths check_paths
;; ;;
*) *)
echo "Usage: /etc/init.d/celeryd {start|stop|restart|kill|create-paths}" echo "Usage: /etc/init.d/${SCRIPT_NAME} {start|stop|restart|graceful|kill|dryrun|create-paths}"
exit 64 # EX_USAGE exit 64 # EX_USAGE
;; ;;
esac esac
exit 0 exit 0

View file

@ -47,3 +47,25 @@ def get_field_value(obj, field_name):
rel = getattr(rel.get(), name) rel = getattr(rel.get(), name)
return rel return rel
def get_model_field_path(origin, target):
""" BFS search on model relaion fields """
mqueue = []
mqueue.append([origin])
pqueue = [[]]
while mqueue:
model = mqueue.pop(0)
path = pqueue.pop(0)
if len(model) > 4:
raise RuntimeError('maximum recursion depth exceeded while looking for %s" % target')
node = model[-1]
if node == target:
return path
for field in node._meta.fields:
if field.rel:
new_model = list(model)
new_model.append(field.rel.to)
mqueue.append(new_model)
new_path = list(path)
new_path.append(field.name)
pqueue.append(new_path)