django-orchestra/orchestra/utils/sys.py

222 lines
6.6 KiB
Python
Raw Normal View History

2014-05-08 16:59:35 +00:00
import errno
import fcntl
import getpass
import os
import re
import select
import subprocess
import sys
2015-05-09 15:37:35 +00:00
import time
2014-05-08 16:59:35 +00:00
from django.core.management.base import CommandError
def check_root(func):
""" Function decorator that checks if user has root permissions """
def wrapped(*args, **kwargs):
if getpass.getuser() != 'root':
cmd_name = func.__module__.split('.')[-1]
msg = "Sorry, '%s' must be executed as a superuser (root)"
raise CommandError(msg % cmd_name)
return func(*args, **kwargs)
return wrapped
2015-05-05 19:42:55 +00:00
def check_non_root(func):
""" Function decorator that checks if user not has root permissions """
def wrapped(*args, **kwargs):
if getpass.getuser() == 'root':
cmd_name = func.__module__.split('.')[-1]
raise CommandError("Sorry, you don't want to execute '%s' as superuser (root)." % cmd_name)
return func(*args, **kwargs)
return wrapped
2015-02-24 09:34:26 +00:00
class _Attribute(object):
2014-05-08 16:59:35 +00:00
""" Simple string subclass to allow arbitrary attribute access. """
2015-02-24 09:34:26 +00:00
def __init__(self, stdout):
self.stdout = stdout
2014-05-08 16:59:35 +00:00
def make_async(fd):
""" Helper function to add the O_NONBLOCK flag to a file descriptor """
fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
def read_async(fd):
"""
Helper function to read some data from a file descriptor, ignoring EAGAIN errors
"""
try:
return fd.read()
2015-04-01 15:49:21 +00:00
except IOError as e:
2014-05-08 16:59:35 +00:00
if e.errno != errno.EAGAIN:
raise e
else:
2015-04-02 16:14:55 +00:00
return ''
2014-05-08 16:59:35 +00:00
2015-05-03 17:44:46 +00:00
def runiterator(command, display=False, stdin=b''):
2014-11-13 15:34:00 +00:00
""" Subprocess wrapper for running commands concurrently """
2014-05-08 16:59:35 +00:00
if display:
sys.stderr.write("\n\033[1m $ %s\033[0m\n" % command)
p = subprocess.Popen(command, shell=True, executable='/bin/bash',
2015-04-20 14:23:10 +00:00
stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE)
2014-08-29 16:13:34 +00:00
2015-04-20 14:23:10 +00:00
p.stdin.write(stdin)
2014-08-29 16:13:34 +00:00
p.stdin.close()
2014-11-13 15:34:00 +00:00
yield
2014-08-29 16:13:34 +00:00
2014-05-08 16:59:35 +00:00
make_async(p.stdout)
make_async(p.stderr)
# Async reading of stdout and sterr
while True:
2015-04-20 14:23:10 +00:00
stdout = b''
stderr = b''
2014-12-22 11:40:02 +00:00
# Get complete unicode chunks
2015-04-02 16:14:55 +00:00
select.select([p.stdout, p.stderr], [], [])
stdoutPiece = read_async(p.stdout)
stderrPiece = read_async(p.stderr)
2015-04-20 14:23:10 +00:00
stdout += (stdoutPiece or b'')
#.decode('ascii'), errors='replace')
stderr += (stderrPiece or b'')
#.decode('ascii'), errors='replace')
2015-04-02 16:14:55 +00:00
2014-12-22 11:40:02 +00:00
if display and stdout:
2015-04-26 13:53:00 +00:00
sys.stdout.write(stdout.decode('utf8'))
2015-02-24 09:34:26 +00:00
if display and stderr:
2015-04-26 13:53:00 +00:00
sys.stderr.write(stderr.decode('utf8'))
2014-05-08 16:59:35 +00:00
2015-02-24 09:34:26 +00:00
state = _Attribute(stdout)
2015-04-04 17:44:07 +00:00
state.stderr = stderr
2015-05-09 17:08:45 +00:00
state.exit_code = p.poll()
2015-05-03 17:44:46 +00:00
state.command = command
2014-11-13 15:34:00 +00:00
yield state
2014-05-08 16:59:35 +00:00
2015-05-09 17:08:45 +00:00
if state.exit_code != None:
2014-11-13 15:34:00 +00:00
p.stdout.close()
p.stderr.close()
raise StopIteration
2015-05-09 17:08:45 +00:00
def join(iterator, display=False, silent=False, valid_codes=(0,)):
2015-05-03 17:44:46 +00:00
""" joins the iterator process """
2015-04-20 14:23:10 +00:00
stdout = b''
stderr = b''
2014-11-13 15:34:00 +00:00
for state in iterator:
stdout += state.stdout
stderr += state.stderr
2015-05-09 17:08:45 +00:00
exit_code = state.exit_code
2014-05-08 16:59:35 +00:00
2015-02-24 09:34:26 +00:00
out = _Attribute(stdout.strip())
2014-11-13 15:34:00 +00:00
err = stderr.strip()
2014-05-08 16:59:35 +00:00
out.failed = False
2015-05-09 17:08:45 +00:00
out.exit_code = exit_code
2014-05-08 16:59:35 +00:00
out.stderr = err
2015-05-09 17:08:45 +00:00
if exit_code not in valid_codes:
2014-05-08 16:59:35 +00:00
out.failed = True
msg = "\nrun() encountered an error (return code %s) while executing '%s'\n"
2015-05-09 17:08:45 +00:00
msg = msg % (exit_code, state.command)
2014-10-02 15:58:27 +00:00
if display:
sys.stderr.write("\n\033[1;31mCommandError: %s %s\033[m\n" % (msg, err))
2014-05-08 16:59:35 +00:00
if not silent:
2015-05-06 14:39:25 +00:00
raise CommandError("%s %s" % (msg, err))
2014-05-08 16:59:35 +00:00
out.succeeded = not out.failed
return out
2015-05-09 17:08:45 +00:00
def run(command, display=False, valid_codes=(0,), silent=False, stdin=b'', async=False):
2015-05-03 17:44:46 +00:00
iterator = runiterator(command, display, stdin)
next(iterator)
if async:
return iterator
return join(iterator, display=display, silent=silent, valid_codes=valid_codes)
2015-05-03 17:44:46 +00:00
def sshrun(addr, command, *args, executable='bash', persist=False, **kwargs):
2015-05-11 14:22:36 +00:00
from .. import settings
2015-08-05 22:58:35 +00:00
options = [
'stricthostkeychecking=no',
'BatchMode=yes',
'EscapeChar=none',
]
if persist:
options.extend((
'ControlMaster=auto',
'ControlPersist=yes',
2015-05-11 14:22:36 +00:00
'ControlPath=' + settings.ORCHESTRA_SSH_CONTROL_PATH,
))
2015-05-11 14:05:39 +00:00
options = ' -o '.join(options)
cmd = 'ssh -o {options} -C root@{addr} {executable}'.format(options=options, addr=addr,
executable=executable)
return run(cmd, *args, stdin=command.encode('utf8'), **kwargs)
2014-10-03 14:02:11 +00:00
2014-05-08 16:59:35 +00:00
def get_default_celeryd_username():
""" Introspect celeryd defaults file in order to get its username """
user = None
try:
with open('/etc/default/celeryd') as celeryd_defaults:
for line in celeryd_defaults.readlines():
if 'CELERYD_USER=' in line:
user = re.findall('"([^"]*)"', line)[0]
finally:
if user is None:
raise CommandError("Can not find the default celeryd username")
return user
2015-05-06 10:51:12 +00:00
2015-05-06 14:39:25 +00:00
def touch(fname, mode=0o666, dir_fd=None, **kwargs):
flags = os.O_CREAT | os.O_APPEND
with os.fdopen(os.open(fname, flags=flags, mode=mode, dir_fd=dir_fd)) as f:
os.utime(f.fileno() if os.utime in os.supports_fd else fname,
dir_fd=None if os.supports_fd else dir_fd, **kwargs)
2015-05-09 15:37:35 +00:00
class OperationLocked(Exception):
pass
2015-05-06 10:51:12 +00:00
class LockFile(object):
""" File-based lock mechanism used for preventing concurrency problems """
def __init__(self, lockfile, expire=5*60, unlocked=False):
2015-05-11 14:05:39 +00:00
# /dev/shm/ can be a good place for storing locks
2015-05-06 10:51:12 +00:00
self.lockfile = lockfile
self.expire = expire
self.unlocked = unlocked
def acquire(self):
if os.path.exists(self.lockfile):
lock_time = os.path.getmtime(self.lockfile)
# lock expires to avoid starvation
if time.time()-lock_time < self.expire:
return False
touch(self.lockfile)
return True
def release(self):
os.remove(self.lockfile)
def __enter__(self):
if not self.unlocked:
if not self.acquire():
2015-05-09 15:37:35 +00:00
raise OperationLocked("%s lock file exists and its mtime is less than %s seconds" %
(self.lockfile, self.expire))
2015-05-06 10:51:12 +00:00
return True
def __exit__(self, type, value, traceback):
if not self.unlocked:
self.release()
2015-05-07 19:00:02 +00:00
def touch_wsgi(delay=0):
2015-05-07 19:00:02 +00:00
from . import paths
run('{ sleep %i && touch %s/wsgi.py; } &' % (delay, paths.get_project_dir()), async=True)