From 2c122935b3f2a3e47c7c23fd57fadc31be9ca270 Mon Sep 17 00:00:00 2001 From: Marc Aymerich Date: Wed, 6 May 2015 15:32:22 +0000 Subject: [PATCH] Fixed backend script name colision --- orchestra/contrib/domains/backends.py | 4 +-- orchestra/contrib/mailboxes/validators.py | 9 +++-- orchestra/contrib/orchestration/__init__.py | 4 +-- .../management/commands/orchestrate.py | 12 +++---- orchestra/contrib/orchestration/manager.py | 33 ++++++++++--------- orchestra/contrib/orchestration/methods.py | 2 ++ orchestra/contrib/orchestration/models.py | 14 ++++---- .../contrib/orchestration/tests/test_route.py | 6 ++-- orchestra/utils/python.py | 3 ++ 9 files changed, 47 insertions(+), 40 deletions(-) diff --git a/orchestra/contrib/domains/backends.py b/orchestra/contrib/domains/backends.py index 64de9789..c666ade3 100644 --- a/orchestra/contrib/domains/backends.py +++ b/orchestra/contrib/domains/backends.py @@ -98,8 +98,8 @@ class Bind9MasterDomainBackend(ServiceController): from orchestra.contrib.orchestration.manager import router operation = Operation(backend, domain, Operation.SAVE) servers = [] - for server in router.get_servers(operation): - servers.append(server.get_ip()) + for routes in router.get_routes(operation): + servers.append(route.host.get_ip()) return servers def get_masters_ips(self, domain): diff --git a/orchestra/contrib/mailboxes/validators.py b/orchestra/contrib/mailboxes/validators.py index 88dfce7d..ce547a77 100644 --- a/orchestra/contrib/mailboxes/validators.py +++ b/orchestra/contrib/mailboxes/validators.py @@ -51,14 +51,17 @@ def validate_forward(value): def validate_sieve(value): sieve_name = '%s.sieve' % hashlib.md5(value.encode('utf8')).hexdigest() - path = os.path.join(settings.MAILBOXES_SIEVETEST_PATH, sieve_name) - with open(path, 'w') as f: + test_path = os.path.join(settings.MAILBOXES_SIEVETEST_PATH, sieve_name) + with open(test_path, 'w') as f: f.write(value) context = { 'orchestra_root': paths.get_orchestra_dir() } sievetest = settings.MAILBOXES_SIEVETEST_BIN_PATH % context - test = run(' '.join([sievetest, path, '/dev/null']), silent=True) + try: + test = run(' '.join([sievetest, test_path, '/dev/null']), silent=True) + finally: + os.unlink(test_path) if test.return_code: errors = [] for line in test.stderr.decode('utf8').splitlines(): diff --git a/orchestra/contrib/orchestration/__init__.py b/orchestra/contrib/orchestration/__init__.py index c4d3945d..89d0113b 100644 --- a/orchestra/contrib/orchestration/__init__.py +++ b/orchestra/contrib/orchestration/__init__.py @@ -21,13 +21,13 @@ class Operation(): """ set() """ return hash(self) == hash(operation) - def __init__(self, backend, instance, action, servers=None): + def __init__(self, backend, instance, action, routes=None): self.backend = backend # instance should maintain any dynamic attribute until backend execution # deep copy is prefered over copy otherwise objects will share same atributes (queryset cache) self.instance = copy.deepcopy(instance) self.action = action - self.servers = servers + self.routes = routes @classmethod def execute(cls, operations, async=False): diff --git a/orchestra/contrib/orchestration/management/commands/orchestrate.py b/orchestra/contrib/orchestration/management/commands/orchestrate.py index 87aa1974..69fc11fc 100644 --- a/orchestra/contrib/orchestration/management/commands/orchestrate.py +++ b/orchestra/contrib/orchestration/management/commands/orchestrate.py @@ -4,7 +4,7 @@ from django.apps import apps from orchestra.contrib.orchestration import manager, Operation from orchestra.contrib.orchestration.models import Server from orchestra.contrib.orchestration.backends import ServiceBackend -from orchestra.utils.python import import_class, OrderedSet +from orchestra.utils.python import import_class, OrderedSet, AttrDict class Command(BaseCommand): @@ -53,7 +53,7 @@ class Command(BaseCommand): if servers: servers = servers.split(',') backends = backends.split(',') - server_objects = [] + routes = [] # Get and create missing Servers for server in servers: try: @@ -62,12 +62,12 @@ class Command(BaseCommand): server = Server(name=server, address=server) server.full_clean() server.save() - server_objects.append(server) + routes.append(AttrDict(host=server, async=False)) # Generate operations for the given backend for instance in queryset: for backend in backends: backend = import_class(backend) - operations.add(Operation(backend, instance, action, servers=server_objects)) + operations.add(Operation(backend, instance, action, routes=routes)) else: for instance in queryset: manager.collect(instance, action, operations=operations, route_cache=route_cache) @@ -75,9 +75,9 @@ class Command(BaseCommand): servers = [] # Print scripts for key, value in scripts.items(): - server, __ = key + route, __ = key backend, operations = value - servers.append(server.name) + servers.append(str(route.host)) self.stdout.write('# Execute on %s' % server.name) for method, commands in backend.scripts: script = '\n'.join(commands) diff --git a/orchestra/contrib/orchestration/manager.py b/orchestra/contrib/orchestration/manager.py index b708fa4a..9cc86d70 100644 --- a/orchestra/contrib/orchestration/manager.py +++ b/orchestra/contrib/orchestration/manager.py @@ -23,7 +23,8 @@ router = import_class(settings.ORCHESTRATION_ROUTER) def as_task(execute, log, operations): def wrapper(*args, **kwargs): """ send report """ - # Tasks run on a separate transaction pool (thread), no need to temper with the transaction + # Remember that threads have their oun connection poll + # No need to EVER temper with the transaction here try: log = execute(*args, **kwargs) if log.state != log.SUCCESS: @@ -39,7 +40,7 @@ def as_task(execute, log, operations): log.save(update_fields=('state', 'stderr')) # We don't propagate the exception further to avoid transaction rollback finally: - # Store the operation + # Store and log the operation for operation in operations: logger.info("Executed %s" % str(operation)) if operation.instance.pk: @@ -56,13 +57,13 @@ def generate(operations): scripts = OrderedDict() cache = {} block = False - # Generate scripts per server+backend + # Generate scripts per route+backend for operation in operations: logger.debug("Queued %s" % str(operation)) - if operation.servers is None: - operation.servers = router.get_servers(operation, cache=cache) - for server in operation.servers: - key = (server, operation.backend) + if operation.routes is None: + operation.routes = router.get_routes(operation, cache=cache) + for route in operation.routes: + key = (route, operation.backend) if key not in scripts: backend, operations = (operation.backend(), [operation]) scripts[key] = (backend, operations) @@ -106,16 +107,16 @@ def execute(scripts, block=False, async=False): threads_to_join = [] logs = [] for key, value in scripts.items(): - server, __ = key + route, __ = key backend, operations = value - args = (server,) + args = (route.host,) kwargs = { - 'async': async or server.async + 'async': async or route.async } log = backend.create_log(*args, **kwargs) kwargs['log'] = log task = as_task(backend.execute, log, operations) - logger.debug('%s is going to be executed on %s' % (backend, server)) + logger.debug('%s is going to be executed on %s' % (backend, route.host)) if block: # Execute one backend at a time, no need for threads task(*args, **kwargs) @@ -123,7 +124,7 @@ def execute(scripts, block=False, async=False): task = close_connection(task) thread = threading.Thread(target=task, args=args, kwargs=kwargs) thread.start() - if not server.async: + if not route.async: threads_to_join.append(thread) logs.append(log) [ thread.join() for thread in threads_to_join ] @@ -182,10 +183,10 @@ def collect(instance, action, **kwargs): if not execute: continue operation = Operation(backend_cls, selected, iaction) - # Only schedule operations if the router gives servers to execute into - servers = router.get_servers(operation, cache=route_cache) - if servers: - operation.servers = servers + # Only schedule operations if the router has execution routes + routes = router.get_routes(operation, cache=route_cache) + if routes: + operation.routes = routes if iaction != Operation.DELETE: # usually we expect to be using last object state, # except when we are deleting it diff --git a/orchestra/contrib/orchestration/methods.py b/orchestra/contrib/orchestration/methods.py index aa99cdb5..6141dcb5 100644 --- a/orchestra/contrib/orchestration/methods.py +++ b/orchestra/contrib/orchestration/methods.py @@ -33,6 +33,8 @@ def SSH(backend, log, server, cmds, async=False): digest = hashlib.md5(bscript).hexdigest() path = os.path.join(settings.ORCHESTRATION_TEMP_SCRIPT_DIR, digest) remote_path = "%s.remote" % path + # Ensure unique local paths for each file because of problems when os.remove(path) + path += '@%s' % str(server) log.state = log.STARTED log.script = '# %s\n%s' % (remote_path, script) log.save(update_fields=('script', 'state')) diff --git a/orchestra/contrib/orchestration/models.py b/orchestra/contrib/orchestration/models.py index 29db550f..0b79680b 100644 --- a/orchestra/contrib/orchestration/models.py +++ b/orchestra/contrib/orchestration/models.py @@ -150,9 +150,8 @@ class Route(models.Model): def backend_class(self): return ServiceBackend.get_backend(self.backend) - # TODO rename to get_hosts @classmethod - def get_servers(cls, operation, **kwargs): + def get_routes(cls, operation, **kwargs): cache = kwargs.get('cache', {}) if not cache: for route in cls.objects.filter(is_active=True).select_related('host'): @@ -162,19 +161,18 @@ class Route(models.Model): cache[key].append(route) except KeyError: cache[key] = [route] - servers = [] + routes = [] backend_cls = operation.backend key = (backend_cls.get_name(), operation.action) try: - routes = cache[key] + target_routes = cache[key] except KeyError: pass else: - for route in routes: + for route in target_routes: if route.matches(operation.instance): - route.host.async = route.async - servers.append(route.host) - return servers + routes.append(route) + return routes def clean(self): if not self.match: diff --git a/orchestra/contrib/orchestration/tests/test_route.py b/orchestra/contrib/orchestration/tests/test_route.py index f8f1f9de..285105ba 100644 --- a/orchestra/contrib/orchestration/tests/test_route.py +++ b/orchestra/contrib/orchestration/tests/test_route.py @@ -30,12 +30,12 @@ class RouterTests(BaseTestCase): route = Route.objects.create(backend=backend, host=self.host, match='True') operation = Operation(backend=TestBackend, instance=route, action='save') - self.assertEqual(1, len(Route.get_servers(operation))) + self.assertEqual(1, len(Route.get_routes(operation))) route = Route.objects.create(backend=backend, host=self.host1, match='route.backend == "%s"' % TestBackend.get_name()) - self.assertEqual(2, len(Route.get_servers(operation))) + self.assertEqual(2, len(Route.get_routes(operation))) route = Route.objects.create(backend=backend, host=self.host2, match='route.backend == "something else"') - self.assertEqual(2, len(Route.get_servers(operation))) + self.assertEqual(2, len(Route.get_routes(operation))) diff --git a/orchestra/utils/python.py b/orchestra/utils/python.py index a4493827..4c610e71 100644 --- a/orchestra/utils/python.py +++ b/orchestra/utils/python.py @@ -83,6 +83,9 @@ class AttrDict(dict): def __init__(self, *args, **kwargs): super(AttrDict, self).__init__(*args, **kwargs) self.__dict__ = self + + def __hash__(self): + return hash(id(self)) class CaptureStdout(list):