diff --git a/passbook/core/policies.py b/passbook/core/policies.py index 1c706f165..eaa9156f0 100644 --- a/passbook/core/policies.py +++ b/passbook/core/policies.py @@ -1,7 +1,9 @@ """passbook core policy engine""" from logging import getLogger +from amqp.exceptions import UnexpectedFrame from celery import group +from celery.exceptions import TimeoutError as CeleryTimeoutError from ipware import get_client_ip from passbook.core.celery import CELERY_APP @@ -38,6 +40,7 @@ class PolicyEngine: _group = None _request = None _user = None + _get_timeout = 0 def __init__(self, policies): self.policies = policies @@ -67,7 +70,14 @@ class PolicyEngine: if not kwargs['remote_ip']: kwargs['remote_ip'] = '255.255.255.255' for policy in self.policies: - signatures.append(_policy_engine_task.s(self._user.pk, policy.pk.hex, **kwargs)) + signatures.append(_policy_engine_task.signature( + args=(self._user.pk, policy.pk.hex), + kwargs=kwargs, + time_limit=policy.timeout)) + self._get_timeout += policy.timeout + self._get_timeout += 3 + self._get_timeout = (self._get_timeout / len(self.policies)) * 1.5 + LOGGER.debug("Set total policy timeout to %r", self._get_timeout) self._group = group(signatures)() return self @@ -76,10 +86,14 @@ class PolicyEngine: """Get policy-checking result""" messages = [] try: - # ValueError can be thrown from _policy_engine_task when user is None - group_result = self._group.get() + group_result = self._group.get(timeout=self._get_timeout) except ValueError as exc: - return False, str(exc) + # ValueError can be thrown from _policy_engine_task when user is None + return False, [str(exc)] + except UnexpectedFrame as exc: + return False, [str(exc)] + except CeleryTimeoutError as exc: + return False, [str(exc)] for policy_action, policy_result, policy_message in group_result: passing = (policy_action == Policy.ACTION_ALLOW and policy_result) or \ (policy_action == Policy.ACTION_DENY and not policy_result)