From 9d2d0befc4936174beb3b4d806a48999f0586181 Mon Sep 17 00:00:00 2001 From: Santiago Lamora Date: Tue, 30 Mar 2021 12:51:12 +0200 Subject: [PATCH] Rename `async`--> `run_async` On Python3.5 async becames a reserved keyword. --- orchestra/bin/orchestra-beat | 34 ++++----- orchestra/contrib/orchestration/__init__.py | 28 ++++---- orchestra/contrib/orchestration/admin.py | 48 ++++++------- orchestra/contrib/orchestration/backends.py | 52 +++++++------- orchestra/contrib/orchestration/helpers.py | 18 ++--- .../management/commands/orchestrate.py | 12 ++-- orchestra/contrib/orchestration/manager.py | 14 ++-- orchestra/contrib/orchestration/methods.py | 16 ++--- .../0009_rename_route_async_run_async.py | 25 +++++++ orchestra/contrib/orchestration/models.py | 52 +++++++------- orchestra/contrib/orchestration/utils.py | 10 +-- orchestra/contrib/resources/actions.py | 6 +- orchestra/contrib/resources/models.py | 70 +++++++++---------- orchestra/contrib/resources/tasks.py | 4 +- orchestra/contrib/tasks/beat.py | 6 +- orchestra/contrib/tasks/utils.py | 2 +- orchestra/utils/sys.py | 40 +++++------ 17 files changed, 231 insertions(+), 206 deletions(-) create mode 100644 orchestra/contrib/orchestration/migrations/0009_rename_route_async_run_async.py diff --git a/orchestra/bin/orchestra-beat b/orchestra/bin/orchestra-beat index 09d13fa2..b11eda09 100755 --- a/orchestra/bin/orchestra-beat +++ b/orchestra/bin/orchestra-beat @@ -27,7 +27,7 @@ class crontab_parser(object): _range = r'(\w+?)-(\w+)' _steps = r'/(\w+)?' _star = r'\*' - + def __init__(self, max_=60, min_=0): self.max_ = max_ self.min_ = min_ @@ -45,14 +45,14 @@ class crontab_parser(object): raise self.ParseException('empty part') acc |= set(self._parse_part(part)) return acc - + def _parse_part(self, part): for regex, handler in self.pats: m = regex.match(part) if m: return handler(m.groups()) return self._expand_range((part, )) - + def _expand_range(self, toks): fr = self._expand_number(toks[0]) if len(toks) > 1: @@ -62,19 +62,19 @@ class crontab_parser(object): list(range(self.min_, to + 1))) return list(range(fr, to + 1)) return [fr] - + def _range_steps(self, toks): if len(toks) != 3 or not toks[2]: raise self.ParseException('empty filter') return self._expand_range(toks[:2])[::int(toks[2])] - + def _star_steps(self, toks): if not toks or not toks[0]: raise self.ParseException('empty filter') return self._expand_star()[::int(toks[0])] def _expand_star(self, *args): return list(range(self.min_, self.max_ + self.min_)) - + def _expand_number(self, s): if isinstance(s, str) and s[0] == '-': raise self.ParseException('negative numbers not supported') @@ -99,7 +99,7 @@ class Setting(object): def __init__(self, manage): self.manage = manage self.settings_file = self.get_settings_file(manage) - + def get_settings(self): """ get db settings from settings.py file without importing """ settings = {'__file__': self.settings_file} @@ -111,7 +111,7 @@ class Setting(object): content += line exec(content, settings) return settings - + def get_settings_file(self, manage): with open(manage, 'r') as handler: regex = re.compile(r'"DJANGO_SETTINGS_MODULE"\s*,\s*"([^"]+)"') @@ -128,7 +128,7 @@ class Setting(object): class DB(object): def __init__(self, settings): self.settings = settings['DATABASES']['default'] - + def connect(self): if self.settings['ENGINE'] == 'django.db.backends.sqlite3': import sqlite3 @@ -138,7 +138,7 @@ class DB(object): self.conn = psycopg2.connect("dbname='{NAME}' user='{USER}' host='{HOST}' password='{PASSWORD}'".format(**self.settings)) else: raise ValueError("%s engine not supported." % self.settings['ENGINE']) - + def query(self, query): cur = self.conn.cursor() try: @@ -147,7 +147,7 @@ class DB(object): finally: cur.close() return result - + def close(self): self.conn.close() @@ -161,7 +161,7 @@ def fire_pending_tasks(manage, db): "WHERE p.crontab_id = c.id AND p.enabled = {}" ).format(enabled) return db.query(query) - + def is_due(now, minute, hour, day_of_week, day_of_month, month_of_year): n_minute, n_hour, n_day_of_week, n_day_of_month, n_month_of_year = now return ( @@ -171,14 +171,14 @@ def fire_pending_tasks(manage, db): n_day_of_month in crontab_parser(31, 1).parse(day_of_month) and n_month_of_year in crontab_parser(12, 1).parse(month_of_year) ) - + now = datetime.utcnow() now = tuple(map(int, now.strftime("%M %H %w %d %m").split())) for minute, hour, day_of_week, day_of_month, month_of_year, task_id in get_tasks(db): if is_due(now, minute, hour, day_of_week, day_of_month, month_of_year): command = 'python3 -W ignore::DeprecationWarning {manage} runtask {task_id}'.format( manage=manage, task_id=task_id) - proc = run(command, async=True) + proc = run(command, run_async=True) yield proc @@ -187,7 +187,7 @@ def fire_pending_messages(settings, db): MAILER_DEFERE_SECONDS = settings.get('MAILER_DEFERE_SECONDS', (300, 600, 60*60, 60*60*24)) now = datetime.utcnow() query_or = [] - + for num, seconds in enumerate(MAILER_DEFERE_SECONDS): delta = timedelta(seconds=seconds) epoch = now-delta @@ -198,10 +198,10 @@ def fire_pending_messages(settings, db): WHERE (mailer_message.state = 'QUEUED' OR (mailer_message.state = 'DEFERRED' AND (%s))) LIMIT 1""" % ' OR '.join(query_or) return bool(db.query(query)) - + if has_pending_messages(settings, db): command = 'python3 -W ignore::DeprecationWarning {manage} sendpendingmessages'.format(manage=manage) - proc = run(command, async=True) + proc = run(command, run_async=True) yield proc diff --git a/orchestra/contrib/orchestration/__init__.py b/orchestra/contrib/orchestration/__init__.py index b5aded20..c4774481 100644 --- a/orchestra/contrib/orchestration/__init__.py +++ b/orchestra/contrib/orchestration/__init__.py @@ -15,21 +15,21 @@ class Operation(): MONITOR = 'monitor' EXCEEDED = 'exceeded' RECOVERY = 'recovery' - + def __str__(self): return '%s.%s(%s)' % (self.backend, self.action, self.instance) - + def __repr__(self): return str(self) - + def __hash__(self): """ set() """ return hash((self.backend, self.instance, self.action)) - + def __eq__(self, operation): """ set() """ return hash(self) == hash(operation) - + def __init__(self, backend, instance, action, routes=None): self.backend = backend # instance should maintain any dynamic attribute until backend execution @@ -37,13 +37,13 @@ class Operation(): self.instance = copy.deepcopy(instance) self.action = action self.routes = routes - + @classmethod - def execute(cls, operations, serialize=False, async=None): + def execute(cls, operations, serialize=False, run_async=None): from . import manager scripts, backend_serialize = manager.generate(operations) - return manager.execute(scripts, serialize=(serialize or backend_serialize), async=async) - + return manager.execute(scripts, serialize=(serialize or backend_serialize), run_async=run_async) + @classmethod def create_for_action(cls, instances, action): if not isinstance(instances, collections.Iterable): @@ -56,13 +56,13 @@ class Operation(): cls(backend_cls, instance, action) ) return operations - + @classmethod def execute_action(cls, instances, action): """ instances can be an object or an iterable for batch processing """ operations = cls.create_for_action(instances, action) return cls.execute(operations) - + def preload_context(self): """ Heuristic: Running get_context will prevent most of related objects do not exist errors @@ -70,7 +70,7 @@ class Operation(): if self.action == self.DELETE: if hasattr(self.backend, 'get_context'): self.backend().get_context(self.instance) - + def store(self, log): from .models import BackendOperation return BackendOperation.objects.create( @@ -79,7 +79,7 @@ class Operation(): instance=self.instance, action=self.action, ) - + @classmethod def load(cls, operation, log=None): routes = None @@ -88,4 +88,4 @@ class Operation(): (operation.backend, operation.action): AttrDict(host=log.server) } return cls(operation.backend_class, operation.instance, operation.action, routes=routes) - + diff --git a/orchestra/contrib/orchestration/admin.py b/orchestra/contrib/orchestration/admin.py index da9cfe01..60737d09 100644 --- a/orchestra/contrib/orchestration/admin.py +++ b/orchestra/contrib/orchestration/admin.py @@ -30,25 +30,25 @@ STATE_COLORS = { class RouteAdmin(ExtendedModelAdmin): list_display = ( - 'display_backend', 'host', 'match', 'display_model', 'display_actions', 'async', + 'display_backend', 'host', 'match', 'display_model', 'display_actions', 'run_async', 'is_active' ) - list_editable = ('host', 'match', 'async', 'is_active') - list_filter = ('host', 'is_active', 'async', 'backend') + list_editable = ('host', 'match', 'run_async', 'is_active') + list_filter = ('host', 'is_active', 'run_async', 'backend') list_prefetch_related = ('host',) ordering = ('backend',) - add_fields = ('backend', 'host', 'match', 'async', 'is_active') + add_fields = ('backend', 'host', 'match', 'run_async', 'is_active') change_form = RouteForm actions = (orchestrate,) change_view_actions = actions - + BACKEND_HELP_TEXT = helpers.get_backends_help_text(ServiceBackend.get_backends()) DEFAULT_MATCH = { backend.get_name(): backend.default_route_match for backend in ServiceBackend.get_backends() } - + display_backend = display_plugin_field('backend') - + def display_model(self, route): try: return escape(route.backend_class.model) @@ -56,7 +56,7 @@ class RouteAdmin(ExtendedModelAdmin): return "NOT AVAILABLE" display_model.short_description = _("model") display_model.allow_tags = True - + def display_actions(self, route): try: return '
'.join(route.backend_class.get_actions()) @@ -64,7 +64,7 @@ class RouteAdmin(ExtendedModelAdmin): return "NOT AVAILABLE" display_actions.short_description = _("actions") display_actions.allow_tags = True - + def formfield_for_dbfield(self, db_field, **kwargs): """ Provides dynamic help text on backend form field """ if db_field.name == 'backend': @@ -79,23 +79,23 @@ class RouteAdmin(ExtendedModelAdmin): request._host_choices_cache = choices = list(field.choices) field.choices = choices return field - + def get_form(self, request, obj=None, **kwargs): """ Include dynamic help text for existing objects """ form = super(RouteAdmin, self).get_form(request, obj, **kwargs) if obj: form.base_fields['backend'].help_text = self.BACKEND_HELP_TEXT.get(obj.backend, '') return form - + def show_orchestration_disabled(self, request): if settings.ORCHESTRATION_DISABLE_EXECUTION: msg = _("Orchestration execution is disabled by ORCHESTRATION_DISABLE_EXECUTION setting.") self.message_user(request, mark_safe(msg), messages.WARNING) - + def changelist_view(self, request, extra_context=None): self.show_orchestration_disabled(request) return super(RouteAdmin, self).changelist_view(request, extra_context) - + def changeform_view(self, request, object_id=None, form_url='', extra_context=None): self.show_orchestration_disabled(request) return super(RouteAdmin, self).changeform_view( @@ -108,12 +108,12 @@ class BackendOperationInline(admin.TabularInline): readonly_fields = ('action', 'instance_link') extra = 0 can_delete = False - + class Media: css = { 'all': ('orchestra/css/hide-inline-id.css',) } - + def instance_link(self, operation): link = admin_link('instance')(self, operation) if link == '---': @@ -122,10 +122,10 @@ class BackendOperationInline(admin.TabularInline): return link instance_link.allow_tags = True instance_link.short_description = _("Instance") - + def has_add_permission(self, *args, **kwargs): return False - + def get_queryset(self, request): queryset = super(BackendOperationInline, self).get_queryset(request) return queryset.prefetch_related('instance') @@ -149,7 +149,7 @@ class BackendLogAdmin(ChangeViewActionsMixin, admin.ModelAdmin): readonly_fields = fields actions = (retry_backend,) change_view_actions = actions - + server_link = admin_link('server') display_created = admin_date('created_at', short_description=_("Created")) display_state = admin_colored('state', colors=STATE_COLORS) @@ -157,17 +157,17 @@ class BackendLogAdmin(ChangeViewActionsMixin, admin.ModelAdmin): mono_stdout = display_mono('stdout') mono_stderr = display_mono('stderr') mono_traceback = display_mono('traceback') - + class Media: css = { 'all': ('orchestra/css/pygments/github.css',) } - + def get_queryset(self, request): """ Order by structured name and imporve performance """ qs = super(BackendLogAdmin, self).get_queryset(request) return qs.select_related('server').defer('script', 'stdout') - + def has_add_permission(self, *args, **kwargs): return False @@ -177,17 +177,17 @@ class ServerAdmin(ExtendedModelAdmin): list_filter = ('os',) actions = (orchestrate,) change_view_actions = actions - + def display_ping(self, instance): return self._remote_state[instance.pk][0] display_ping.short_description = _("Ping") display_ping.allow_tags = True - + def display_uptime(self, instance): return self._remote_state[instance.pk][1] display_uptime.short_description = _("Uptime") display_uptime.allow_tags = True - + def get_queryset(self, request): """ Order by structured name and imporve performance """ qs = super(ServerAdmin, self).get_queryset(request) diff --git a/orchestra/contrib/orchestration/backends.py b/orchestra/contrib/orchestration/backends.py index cf861102..b2f22267 100644 --- a/orchestra/contrib/orchestration/backends.py +++ b/orchestra/contrib/orchestration/backends.py @@ -31,7 +31,7 @@ class ServiceMount(plugins.PluginMount): class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): """ Service management backend base class - + It uses the _unit of work_ design principle, which allows bulk operations to be conviniently supported. Each backend generates the configuration for all the changes of all modified objects, reloading the daemon just once. @@ -52,15 +52,15 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): # By default backend will not run if actions do not generate insctructions, # If your backend uses prepare() or commit() only then you should set force_empty_action_execution = True force_empty_action_execution = False - + def __str__(self): return type(self).__name__ - + def __init__(self): self.head = [] self.content = [] self.tail = [] - + def __getattribute__(self, attr): """ Select head, content or tail section depending on the method name """ IGNORE_ATTRS = ( @@ -83,29 +83,29 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): elif attr not in IGNORE_ATTRS and attr in self.actions: self.set_content() return super(ServiceBackend, self).__getattribute__(attr) - + def set_head(self): self.cmd_section = self.head - + def set_tail(self): self.cmd_section = self.tail - + def set_content(self): self.cmd_section = self.content - + @classmethod def get_actions(cls): return [ action for action in cls.actions if action in dir(cls) ] - + @classmethod def get_name(cls): return cls.__name__ - + @classmethod def is_main(cls, obj): opts = obj._meta return cls.model == '%s.%s' % (opts.app_label, opts.object_name) - + @classmethod def get_related(cls, obj): opts = obj._meta @@ -122,7 +122,7 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): return related.all() return [related] return [] - + @classmethod def get_backends(cls, instance=None, action=None): backends = cls.get_plugins() @@ -140,15 +140,15 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): if include: included.append(backend) return included - + @classmethod def get_backend(cls, name): return cls.get(name) - + @classmethod def model_class(cls): return apps.get_model(cls.model) - + @property def scripts(self): """ group commands based on their method """ @@ -163,12 +163,12 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): except KeyError: pass return list(scripts.items()) - + def get_banner(self): now = timezone.localtime(timezone.now()) time = now.strftime("%h %d, %Y %I:%M:%S %Z") return "Generated by Orchestra at %s" % time - + def create_log(self, server, **kwargs): from .models import BackendLog state = BackendLog.RECEIVED @@ -181,8 +181,8 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): manager = manager.using(using) log = manager.create(backend=self.get_name(), state=state, server=server) return log - - def execute(self, server, async=False, log=None): + + def execute(self, server, run_async=False, log=None): from .models import BackendLog if log is None: log = self.create_log(server) @@ -190,11 +190,11 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): if run: scripts = self.scripts for method, commands in scripts: - method(log, server, commands, async) + method(log, server, commands, run_async) if log.state != BackendLog.SUCCESS: break return log - + def append(self, *cmd): # aggregate commands acording to its execution method if isinstance(cmd[0], str): @@ -207,10 +207,10 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): self.cmd_section.append((method, [cmd])) else: self.cmd_section[-1][1].append(cmd) - + def get_context(self, obj): return {} - + def prepare(self): """ hook for executing something at the beging @@ -221,7 +221,7 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): set -o pipefail exit_code=0""") ) - + def commit(self): """ hook for executing something at the end @@ -235,11 +235,11 @@ class ServiceBackend(plugins.Plugin, metaclass=ServiceMount): class ServiceController(ServiceBackend): actions = ('save', 'delete') abstract = True - + @classmethod def get_verbose_name(cls): return _("[S] %s") % super(ServiceController, cls).get_verbose_name() - + @classmethod def get_backends(cls): """ filter controller classes """ diff --git a/orchestra/contrib/orchestration/helpers.py b/orchestra/contrib/orchestration/helpers.py index 7f0deae8..798164d0 100644 --- a/orchestra/contrib/orchestration/helpers.py +++ b/orchestra/contrib/orchestration/helpers.py @@ -105,7 +105,7 @@ def get_backend_url(ids): def get_messages(logs): messages = [] - total, successes, async = 0, 0, 0 + total, successes, run_async = 0, 0, 0 ids = [] async_ids = [] for log in logs: @@ -118,17 +118,17 @@ def get_messages(logs): if log.is_success: successes += 1 elif not log.has_finished: - async += 1 + run_async += 1 async_ids.append(log.id) - errors = total-successes-async + errors = total-successes-run_async url = get_backend_url(ids) async_url = get_backend_url(async_ids) async_msg = '' - if async: + if run_async: async_msg = ungettext( _('{name} is running on the background'), - _('{async} backends are running on the background'), - async) + _('{run_async} backends are running on the background'), + run_async) if errors: if total == 1: msg = _('{name} has fail to execute') @@ -139,7 +139,7 @@ def get_messages(logs): errors) if async_msg: msg += ', ' + str(async_msg) - msg = msg.format(errors=errors, async=async, async_url=async_url, total=total, url=url, + msg = msg.format(errors=errors, run_async=run_async, async_url=async_url, total=total, url=url, name=log.backend) messages.append(('error', msg + '.')) elif successes: @@ -158,12 +158,12 @@ def get_messages(logs): _('{total} backends have been executed'), total) msg = msg.format( - total=total, url=url, async_url=async_url, async=async, successes=successes, + total=total, url=url, async_url=async_url, run_async=run_async, successes=successes, name=log.backend ) messages.append(('success', msg + '.')) else: - msg = async_msg.format(url=url, async_url=async_url, async=async, name=log.backend) + msg = async_msg.format(url=url, async_url=async_url, run_async=run_async, name=log.backend) messages.append(('success', msg + '.')) return messages diff --git a/orchestra/contrib/orchestration/management/commands/orchestrate.py b/orchestra/contrib/orchestration/management/commands/orchestrate.py index 4b076f73..211f1b59 100644 --- a/orchestra/contrib/orchestration/management/commands/orchestrate.py +++ b/orchestra/contrib/orchestration/management/commands/orchestrate.py @@ -12,7 +12,7 @@ from orchestra.utils.sys import confirm class Command(BaseCommand): help = 'Runs orchestration backends.' - + def add_arguments(self, parser): parser.add_argument('model', nargs='?', help='Label of a model to execute the orchestration.') @@ -30,8 +30,8 @@ class Command(BaseCommand): help='List available baclends.') parser.add_argument('--dry-run', action='store_true', dest='dry', default=False, help='Only prints scrtipt.') - - + + def collect_operations(self, **options): model = options.get('model') backends = options.get('backends') or set() @@ -66,7 +66,7 @@ class Command(BaseCommand): model = apps.get_model(*model.split('.')) queryset = model.objects.filter(**kwargs).order_by('id') querysets = [queryset] - + operations = OrderedSet() route_cache = {} for queryset in querysets: @@ -88,7 +88,7 @@ class Command(BaseCommand): result.append(operation) operations = result return operations - + def handle(self, *args, **options): list_backends = options.get('list_backends') if list_backends: @@ -116,7 +116,7 @@ class Command(BaseCommand): if not confirm("\n\nAre your sure to execute the previous scripts on %(servers)s (yes/no)? " % context): return if not dry: - logs = manager.execute(scripts, serialize=serialize, async=True) + logs = manager.execute(scripts, serialize=serialize, run_async=True) running = list(logs) stdout = 0 stderr = 0 diff --git a/orchestra/contrib/orchestration/manager.py b/orchestra/contrib/orchestration/manager.py index eafe8422..62572d04 100644 --- a/orchestra/contrib/orchestration/manager.py +++ b/orchestra/contrib/orchestration/manager.py @@ -97,12 +97,12 @@ def generate(operations): return scripts, serialize -def execute(scripts, serialize=False, async=None): +def execute(scripts, serialize=False, run_async=None): """ executes the operations on the servers - + serialize: execute one backend at a time - async: do not join threads (overrides route.async) + run_async: do not join threads (overrides route.run_async) """ if settings.ORCHESTRATION_DISABLE_EXECUTION: logger.info('Orchestration execution is dissabled by ORCHESTRATION_DISABLE_EXECUTION.') @@ -115,12 +115,12 @@ def execute(scripts, serialize=False, async=None): route, __, async_action = key backend, operations = value args = (route.host,) - if async is None: - is_async = not serialize and (route.async or async_action) + if run_async is None: + is_async = not serialize and (route.run_async or async_action) else: - is_async = not serialize and (async or async_action) + is_async = not serialize and (run_async or async_action) kwargs = { - 'async': is_async, + 'run_async': is_async, } # we clone the connection just in case we are isolated inside a transaction with db.clone(model=BackendLog) as handle: diff --git a/orchestra/contrib/orchestration/methods.py b/orchestra/contrib/orchestration/methods.py index db665a0d..cd3d7a22 100644 --- a/orchestra/contrib/orchestration/methods.py +++ b/orchestra/contrib/orchestration/methods.py @@ -17,7 +17,7 @@ from . import settings logger = logging.getLogger(__name__) -def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}): +def Paramiko(backend, log, server, cmds, run_async=False, paramiko_connections={}): """ Executes cmds to remote server using Pramaiko """ @@ -55,7 +55,7 @@ def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}): channel.shutdown_write() # Log results logger.debug('%s running on %s' % (backend, server)) - if async: + if run_async: second = False while True: # Non-blocking is the secret ingridient in the async sauce @@ -78,7 +78,7 @@ def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}): else: log.stdout += channel.makefile('rb', -1).read().decode('utf-8') log.stderr += channel.makefile_stderr('rb', -1).read().decode('utf-8') - + log.exit_code = channel.recv_exit_status() log.state = log.SUCCESS if log.exit_code == 0 else log.FAILURE logger.debug('%s execution state on %s is %s' % (backend, server, log.state)) @@ -97,7 +97,7 @@ def Paramiko(backend, log, server, cmds, async=False, paramiko_connections={}): channel.close() -def OpenSSH(backend, log, server, cmds, async=False): +def OpenSSH(backend, log, server, cmds, run_async=False): """ Executes cmds to remote server using SSH with connection resuse for maximum performance """ @@ -110,9 +110,9 @@ def OpenSSH(backend, log, server, cmds, async=False): return try: ssh = sshrun(server.get_address(), script, executable=backend.script_executable, - persist=True, async=async, silent=True) + persist=True, run_async=run_async, silent=True) logger.debug('%s running on %s' % (backend, server)) - if async: + if run_async: for state in ssh: log.stdout += state.stdout.decode('utf8') log.stderr += state.stderr.decode('utf8') @@ -148,7 +148,7 @@ def SSH(*args, **kwargs): return method(*args, **kwargs) -def Python(backend, log, server, cmds, async=False): +def Python(backend, log, server, cmds, run_async=False): script = '' functions = set() for cmd in cmds: @@ -170,7 +170,7 @@ def Python(backend, log, server, cmds, async=False): log.stdout += line + '\n' if result: log.stdout += '# Result: %s\n' % result - if async: + if run_async: log.save(update_fields=('stdout', 'updated_at')) except: log.exit_code = 1 diff --git a/orchestra/contrib/orchestration/migrations/0009_rename_route_async_run_async.py b/orchestra/contrib/orchestration/migrations/0009_rename_route_async_run_async.py new file mode 100644 index 00000000..f70deeeb --- /dev/null +++ b/orchestra/contrib/orchestration/migrations/0009_rename_route_async_run_async.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.10.5 on 2021-03-30 10:49 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('orchestration', '0008_auto_20190805_1134'), + ] + + operations = [ + migrations.RenameField( + model_name='route', + old_name='async', + new_name='run_async', + ), + migrations.AlterField( + model_name='route', + name='backend', + field=models.CharField(choices=[('Apache2Traffic', '[M] Apache 2 Traffic'), ('ApacheTrafficByName', '[M] ApacheTrafficByName'), ('DokuWikiMuTraffic', '[M] DokuWiki MU Traffic'), ('DovecotMaildirDisk', '[M] Dovecot Maildir size'), ('Exim4Traffic', '[M] Exim4 traffic'), ('MailmanSubscribers', '[M] Mailman subscribers'), ('MailmanTraffic', '[M] Mailman traffic'), ('MysqlDisk', '[M] MySQL disk'), ('PostfixMailscannerTraffic', '[M] Postfix-Mailscanner traffic'), ('ProxmoxOpenVZTraffic', '[M] ProxmoxOpenVZTraffic'), ('UNIXUserDisk', '[M] UNIX user disk'), ('VsFTPdTraffic', '[M] VsFTPd traffic'), ('WordpressMuTraffic', '[M] Wordpress MU Traffic'), ('NextCloudDiskQuota', '[M] nextCloud SaaS Disk Quota'), ('NextcloudTraffic', '[M] nextCloud SaaS Traffic'), ('OwnCloudDiskQuota', '[M] ownCloud SaaS Disk Quota'), ('OwncloudTraffic', '[M] ownCloud SaaS Traffic'), ('PhpListTraffic', '[M] phpList SaaS Traffic'), ('Apache2Controller', '[S] Apache 2'), ('BSCWController', '[S] BSCW SaaS'), ('Bind9MasterDomainController', '[S] Bind9 master domain'), ('Bind9SlaveDomainController', '[S] Bind9 slave domain'), ('DokuWikiMuController', '[S] DokuWiki multisite'), ('DrupalMuController', '[S] Drupal multisite'), ('GitLabSaaSController', '[S] GitLab SaaS'), ('LetsEncryptController', "[S] Let's encrypt!"), ('LxcController', '[S] LxcController'), ('AutoresponseController', '[S] Mail autoresponse'), ('MailmanController', '[S] Mailman'), ('MailmanVirtualDomainController', '[S] Mailman virtdomain-only'), ('MoodleController', '[S] Moodle'), ('MoodleWWWRootController', '[S] Moodle WWWRoot (required)'), ('MoodleMuController', '[S] Moodle multisite'), ('MySQLController', '[S] MySQL database'), ('MySQLUserController', '[S] MySQL user'), ('PHPController', '[S] PHP FPM/FCGID'), ('PostfixAddressController', '[S] Postfix address'), ('PostfixAddressVirtualDomainController', '[S] Postfix address virtdomain-only'), ('ProxmoxOVZ', '[S] ProxmoxOVZ'), ('uWSGIPythonController', '[S] Python uWSGI'), ('RoundcubeIdentityController', '[S] Roundcube Identity Controller'), ('StaticController', '[S] Static'), ('SymbolicLinkController', '[S] Symbolic link webapp'), ('UNIXUserMaildirController', '[S] UNIX maildir user'), ('UNIXUserController', '[S] UNIX user'), ('WebalizerAppController', '[S] Webalizer App'), ('WebalizerController', '[S] Webalizer Content'), ('WordPressForceSSLController', '[S] WordPress Force SSL'), ('WordPressURLController', '[S] WordPress URL'), ('WordPressController', '[S] Wordpress'), ('WordpressMuController', '[S] Wordpress multisite'), ('NextCloudController', '[S] nextCloud SaaS'), ('OwnCloudController', '[S] ownCloud SaaS'), ('PhpListSaaSController', '[S] phpList SaaS')], max_length=256, verbose_name='backend'), + ), + ] diff --git a/orchestra/contrib/orchestration/models.py b/orchestra/contrib/orchestration/models.py index 2952b72f..37d74702 100644 --- a/orchestra/contrib/orchestration/models.py +++ b/orchestra/contrib/orchestration/models.py @@ -33,22 +33,22 @@ class Server(models.Model): os = models.CharField(_("operative system"), max_length=32, choices=settings.ORCHESTRATION_OS_CHOICES, default=settings.ORCHESTRATION_DEFAULT_OS) - + def __str__(self): return self.name or str(self.address) - + def get_address(self): if self.address: return self.address return self.name - + def get_ip(self): address = self.get_address() try: return validate_ip_address(address) except ValidationError: return socket.gethostbyname(self.name) - + def clean(self): self.name = self.name.strip() self.address = self.address.strip() @@ -75,7 +75,7 @@ class BackendLog(models.Model): NOTHING = 'NOTHING' # Special state for mocked backendlogs EXCEPTION = 'EXCEPTION' - + STATES = ( (RECEIVED, RECEIVED), (TIMEOUT, TIMEOUT), @@ -87,7 +87,7 @@ class BackendLog(models.Model): (REVOKED, REVOKED), (NOTHING, NOTHING), ) - + backend = models.CharField(_("backend"), max_length=256) state = models.CharField(_("state"), max_length=16, choices=STATES, default=RECEIVED) server = models.ForeignKey(Server, verbose_name=_("server"), related_name='execution_logs') @@ -100,25 +100,25 @@ class BackendLog(models.Model): help_text="Celery task ID when used as execution backend") created_at = models.DateTimeField(_("created"), auto_now_add=True, db_index=True) updated_at = models.DateTimeField(_("updated"), auto_now=True) - + class Meta: get_latest_by = 'id' - + def __str__(self): return "%s@%s" % (self.backend, self.server) - + @property def execution_time(self): return (self.updated_at-self.created_at).total_seconds() - + @property def has_finished(self): return self.state not in (self.STARTED, self.RECEIVED) - + @property def is_success(self): return self.state in (self.SUCCESS, self.NOTHING) - + def backend_class(self): return ServiceBackend.get_backend(self.backend) @@ -141,20 +141,20 @@ class BackendOperation(models.Model): content_type = models.ForeignKey(ContentType) object_id = models.PositiveIntegerField(null=True) instance_repr = models.CharField(_("instance representation"), max_length=256) - + instance = GenericForeignKey('content_type', 'object_id') objects = BackendOperationQuerySet.as_manager() - + class Meta: verbose_name = _("Operation") verbose_name_plural = _("Operations") index_together = ( ('content_type', 'object_id'), ) - + def __str__(self): return '%s.%s(%s)' % (self.backend, self.action, self.instance or self.instance_repr) - + @cached_property def backend_class(self): return ServiceBackend.get_backend(self.backend) @@ -203,7 +203,7 @@ class Route(models.Model): match = models.CharField(_("match"), max_length=256, blank=True, default='True', help_text=_("Python expression used for selecting the targe host, " "instance referes to the current object.")) - async = models.BooleanField(default=False, + run_async = models.BooleanField(default=False, help_text=_("Whether or not block the request/response cycle waitting this backend to " "finish its execution. Usually you want slave servers to run asynchronously.")) async_actions = MultiSelectField(max_length=256, blank=True, @@ -211,19 +211,19 @@ class Route(models.Model): # method = models.CharField(_("method"), max_lenght=32, choices=method_choices, # default=MethodBackend.get_default()) is_active = models.BooleanField(_("active"), default=True) - + objects = RouteQuerySet.as_manager() - + class Meta: unique_together = ('backend', 'host') - + def __str__(self): return "%s@%s" % (self.backend, self.host) - + @cached_property def backend_class(self): return ServiceBackend.get_backend(self.backend) - + def clean(self): if not self.match: self.match = 'True' @@ -244,10 +244,10 @@ class Route(models.Model): except Exception as exception: name = type(exception).__name__ raise ValidationError(': '.join((name, str(exception)))) - + def action_is_async(self, action): return action in self.async_actions - + def matches(self, instance): safe_locals = { 'instance': instance, @@ -255,11 +255,11 @@ class Route(models.Model): instance._meta.model_name: instance, } return eval(self.match, safe_locals) - + def enable(self): self.is_active = True self.save() - + def disable(self): self.is_active = False self.save() diff --git a/orchestra/contrib/orchestration/utils.py b/orchestra/contrib/orchestration/utils.py index 5d9d2886..9e4dd51d 100644 --- a/orchestra/contrib/orchestration/utils.py +++ b/orchestra/contrib/orchestration/utils.py @@ -6,11 +6,11 @@ def retrieve_state(servers): pings = [] for server in servers: address = server.get_address() - ping = run('ping -c 1 -w 1 %s' % address, async=True) + ping = run('ping -c 1 -w 1 %s' % address, run_async=True) pings.append(ping) - uptime = sshrun(address, 'uptime', persist=True, async=True, options={'ConnectTimeout': 1}) + uptime = sshrun(address, 'uptime', persist=True, run_async=True, options={'ConnectTimeout': 1}) uptimes.append(uptime) - + state = {} for server, ping, uptime in zip(servers, pings, uptimes): ping = join(ping, silent=True) @@ -19,7 +19,7 @@ def retrieve_state(servers): ping = '%s ms' % ping.split('/')[4] else: ping = 'Offline' - + uptime = join(uptime, silent=True) uptime_stderr = uptime.stderr.decode() uptime = uptime.stdout.decode().split() @@ -28,5 +28,5 @@ def retrieve_state(servers): else: uptime = '%s' % uptime_stderr state[server.pk] = (ping, uptime) - + return state diff --git a/orchestra/contrib/resources/actions.py b/orchestra/contrib/resources/actions.py index a40e7aa1..ea1ec971 100644 --- a/orchestra/contrib/resources/actions.py +++ b/orchestra/contrib/resources/actions.py @@ -7,14 +7,14 @@ from django.utils.translation import ungettext, ugettext_lazy as _ def run_monitor(modeladmin, request, queryset): """ Resource and ResourceData run monitors """ referer = request.META.get('HTTP_REFERER') - async = modeladmin.model.monitor.__defaults__[0] + run_async = modeladmin.model.monitor.__defaults__[0] logs = set() for resource in queryset: rlogs = resource.monitor() - if not async: + if not run_async: logs = logs.union(set([str(log.pk) for log in rlogs])) modeladmin.log_change(request, resource, _("Run monitors")) - if async: + if run_async: num = len(queryset) # TODO listfilter by uuid: task.request.id + ?task_id__in=ids link = reverse('admin:djcelery_taskstate_changelist') diff --git a/orchestra/contrib/resources/models.py b/orchestra/contrib/resources/models.py index 143d0496..3da69d81 100644 --- a/orchestra/contrib/resources/models.py +++ b/orchestra/contrib/resources/models.py @@ -26,7 +26,7 @@ class Resource(models.Model): Defines a resource, a resource is basically an interpretation of data gathered by a Monitor """ - + LAST = 'LAST' MONTHLY_SUM = 'MONTHLY_SUM' MONTHLY_AVG = 'MONTHLY_AVG' @@ -36,7 +36,7 @@ class Resource(models.Model): (MONTHLY_AVG, _("Monthly avg")), ) _related = set() # keeps track of related models for resource cleanup - + name = models.CharField(_("name"), max_length=32, help_text=_("Required. 32 characters or fewer. Lowercase letters, " "digits and hyphen only."), @@ -70,27 +70,27 @@ class Resource(models.Model): choices=ServiceMonitor.get_choices(), help_text=_("Monitor backends used for monitoring this resource.")) is_active = models.BooleanField(_("active"), default=True) - + objects = ResourceQuerySet.as_manager() - + class Meta: unique_together = ( ('name', 'content_type'), ('verbose_name', 'content_type') ) - + def __str__(self): return "%s-%s" % (self.content_type, self.name) - + @cached_property def aggregation_class(self): return Aggregation.get(self.aggregation) - + @cached_property def aggregation_instance(self): """ Per request lived type_instance """ return self.aggregation_class(self) - + def clean(self): self.verbose_name = self.verbose_name.strip() if self.on_demand and self.default_allocation: @@ -114,12 +114,12 @@ class Resource(models.Model): model_name, ) for error in monitor_errors ]}) - + def save(self, *args, **kwargs): super(Resource, self).save(*args, **kwargs) # This only works on tests (multiprocessing used on real deployments) apps.get_app_config('resources').reload_relations() - + def sync_periodic_task(self, delete=False): """ sync periodic task on save/delete resource operations """ name = 'monitor.%s' % self @@ -140,21 +140,21 @@ class Resource(models.Model): if task.crontab != self.crontab: task.crontab = self.crontab task.save(update_fields=['crontab']) - + def get_model_path(self, monitor): """ returns a model path between self.content_type and monitor.model """ resource_model = self.content_type.model_class() monitor_model = ServiceMonitor.get_backend(monitor).model_class() return get_model_field_path(monitor_model, resource_model) - + def get_scale(self): return eval(self.scale) - + def get_verbose_name(self): return self.verbose_name or self.name - - def monitor(self, async=True): - if async: + + def monitor(self, run_async=True): + if run_async: return tasks.monitor.apply_async(self.pk) return tasks.monitor(self.pk) @@ -187,28 +187,28 @@ class ResourceData(models.Model): allocated = models.PositiveIntegerField(_("allocated"), null=True, blank=True) content_object_repr = models.CharField(_("content object representation"), max_length=256, editable=False) - + content_object = GenericForeignKey() objects = ResourceDataQuerySet.as_manager() - + class Meta: unique_together = ('resource', 'content_type', 'object_id') verbose_name_plural = _("resource data") index_together = ( ('content_type', 'object_id'), ) - + def __str__(self): return "%s: %s" % (self.resource, self.content_object) - + @property def unit(self): return self.resource.unit - + @property def verbose_name(self): return self.resource.verbose_name - + def get_used(self): resource = self.resource total = 0 @@ -220,7 +220,7 @@ class ResourceData(models.Model): has_result = True total += usage return float(total)/resource.get_scale() if has_result else None - + def update(self, current=None): if current is None: current = self.get_used() @@ -228,13 +228,13 @@ class ResourceData(models.Model): self.updated_at = timezone.now() self.content_object_repr = str(self.content_object) self.save(update_fields=('used', 'updated_at', 'content_object_repr')) - - def monitor(self, async=False): + + def monitor(self, run_async=False): ids = (self.object_id,) - if async: + if run_async: return tasks.monitor.delay(self.resource_id, ids=ids) return tasks.monitor(self.resource_id, ids=ids) - + def get_monitor_datasets(self): resource = self.resource for monitor in resource.monitors: @@ -275,20 +275,20 @@ class MonitorData(models.Model): help_text=_("Optional field used to store current state needed for diff-based monitoring.")) content_object_repr = models.CharField(_("content object representation"), max_length=256, editable=False) - + content_object = GenericForeignKey() objects = MonitorDataQuerySet.as_manager() - + class Meta: get_latest_by = 'id' verbose_name_plural = _("monitor data") index_together = ( ('content_type', 'object_id'), ) - + def __str__(self): return str(self.monitor) - + @cached_property def unit(self): return self.resource.unit @@ -324,15 +324,15 @@ def create_resource_relation(): ) self.obj.__resource_cache[attr] = rdata return rdata - + def __get__(self, obj, cls): """ proxy handled object """ self.obj = obj return self - + def __iter__(self): return iter(self.obj.resource_set.all()) - + # Clean previous state for related in Resource._related: try: @@ -344,7 +344,7 @@ def create_resource_relation(): related._meta.private_fields = [ field for field in related._meta.private_fields if field.rel.to != ResourceData ] - + for ct, resources in Resource.objects.group_by('content_type').items(): model = ct.model_class() relation = GenericRelation('resources.ResourceData') diff --git a/orchestra/contrib/resources/tasks.py b/orchestra/contrib/resources/tasks.py index 4b176fda..10f80729 100644 --- a/orchestra/contrib/resources/tasks.py +++ b/orchestra/contrib/resources/tasks.py @@ -36,8 +36,8 @@ def monitor(resource_id, ids=None): for obj in model.objects.filter(**kwargs): op = Operation(backend, obj, Operation.MONITOR) monitorings.append(op) - logs += Operation.execute(monitorings, async=False) - + logs += Operation.execute(monitorings, run_async=False) + kwargs = {'id__in': ids} if ids else {} # Update used resources and trigger resource exceeded and revovery triggers = [] diff --git a/orchestra/contrib/tasks/beat.py b/orchestra/contrib/tasks/beat.py index 81732b6d..7a4772ac 100644 --- a/orchestra/contrib/tasks/beat.py +++ b/orchestra/contrib/tasks/beat.py @@ -23,11 +23,11 @@ def is_due(task, time=None): ) -def run_task(task, thread=True, process=False, async=False): +def run_task(task, thread=True, process=False, run_async=False): args = json.loads(task.args) kwargs = json.loads(task.kwargs) task_fn = current_app.tasks.get(task.task) - if async: + if run_async: method = 'process' if process else 'thread' return apply_async(task_fn, method=method).apply_async(*args, **kwargs) return task_fn(*args, **kwargs) @@ -38,6 +38,6 @@ def run(): procs = [] for task in PeriodicTask.objects.enabled().select_related('crontab'): if is_due(task, now): - proc = run_task(task, process=True, async=True) + proc = run_task(task, process=True, run_async=True) procs.append(proc) [proc.join() for proc in procs] diff --git a/orchestra/contrib/tasks/utils.py b/orchestra/contrib/tasks/utils.py index 19972696..96b5bf0b 100644 --- a/orchestra/contrib/tasks/utils.py +++ b/orchestra/contrib/tasks/utils.py @@ -13,7 +13,7 @@ def get_name(fn): def run(method, *args, **kwargs): - async = kwargs.pop('async', True) + run_async = kwargs.pop('run_async', True) thread = threading.Thread(target=close_connection(method), args=args, kwargs=kwargs) thread = Process(target=close_connection(counter)) thread.start() diff --git a/orchestra/utils/sys.py b/orchestra/utils/sys.py index 07b0fcb4..fc732343 100644 --- a/orchestra/utils/sys.py +++ b/orchestra/utils/sys.py @@ -71,43 +71,43 @@ def runiterator(command, display=False, stdin=b''): """ Subprocess wrapper for running commands concurrently """ if display: sys.stderr.write("\n\033[1m $ %s\033[0m\n" % command) - + p = subprocess.Popen(command, shell=True, executable='/bin/bash', stdout=subprocess.PIPE, stderr=subprocess.PIPE, stdin=subprocess.PIPE) - + p.stdin.write(stdin) p.stdin.close() yield - + make_async(p.stdout) make_async(p.stderr) - + # Async reading of stdout and sterr while True: stdout = b'' stderr = b'' # Get complete unicode chunks select.select([p.stdout, p.stderr], [], []) - + stdoutPiece = read_async(p.stdout) stderrPiece = read_async(p.stderr) - + stdout += (stdoutPiece or b'') #.decode('ascii'), errors='replace') stderr += (stderrPiece or b'') #.decode('ascii'), errors='replace') - + if display and stdout: sys.stdout.write(stdout.decode('utf8')) if display and stderr: sys.stderr.write(stderr.decode('utf8')) - + state = _Attribute(stdout) state.stderr = stderr state.exit_code = p.poll() state.command = command yield state - + if state.exit_code != None: p.stdout.close() p.stderr.close() @@ -121,12 +121,12 @@ def join(iterator, display=False, silent=False, valid_codes=(0,)): for state in iterator: stdout += state.stdout stderr += state.stderr - + exit_code = state.exit_code - + out = _Attribute(stdout.strip()) err = stderr.strip() - + out.failed = False out.exit_code = exit_code out.stderr = err @@ -138,7 +138,7 @@ def join(iterator, display=False, silent=False, valid_codes=(0,)): sys.stderr.write("\n\033[1;31mCommandError: %s %s\033[m\n" % (msg, err)) if not silent: raise CommandError("%s %s" % (msg, err)) - + out.succeeded = not out.failed return out @@ -151,10 +151,10 @@ def joinall(iterators, **kwargs): return results -def run(command, display=False, valid_codes=(0,), silent=False, stdin=b'', async=False): +def run(command, display=False, valid_codes=(0,), silent=False, stdin=b'', run_async=False): iterator = runiterator(command, display, stdin) next(iterator) - if async: + if run_async: return iterator return join(iterator, display=display, silent=silent, valid_codes=valid_codes) @@ -213,7 +213,7 @@ class LockFile(object): self.lockfile = lockfile self.expire = expire self.unlocked = unlocked - + def acquire(self): if os.path.exists(self.lockfile): lock_time = os.path.getmtime(self.lockfile) @@ -222,17 +222,17 @@ class LockFile(object): return False touch(self.lockfile) return True - + def release(self): os.remove(self.lockfile) - + def __enter__(self): if not self.unlocked: if not self.acquire(): raise OperationLocked("%s lock file exists and its mtime is less than %s seconds" % (self.lockfile, self.expire)) return True - + def __exit__(self, type, value, traceback): if not self.unlocked: self.release() @@ -240,4 +240,4 @@ class LockFile(object): def touch_wsgi(delay=0): from . import paths - run('{ sleep %i && touch %s/wsgi.py; } &' % (delay, paths.get_project_dir()), async=True) + run('{ sleep %i && touch %s/wsgi.py; } &' % (delay, paths.get_project_dir()), run_async=True)