core: separate expiry logic from tokens and make re-usable

This commit is contained in:
Jens Langhammer 2020-07-20 10:57:12 +02:00
parent 2be7d3191f
commit c60d1e1f9a
4 changed files with 36 additions and 31 deletions

View file

@ -198,6 +198,31 @@ class UserSourceConnection(CreatedUpdatedModel):
unique_together = (("user", "source"),) unique_together = (("user", "source"),)
class ExpiringModel(models.Model):
"""Base Model which can expire, and is automatically cleaned up."""
expires = models.DateTimeField(default=default_token_duration)
expiring = models.BooleanField(default=True)
@classmethod
def filter_not_expired(cls, **kwargs) -> QuerySet:
"""Filer for tokens which are not expired yet or are not expiring,
and match filters in `kwargs`"""
query = Q(**kwargs)
query_not_expired_yet = Q(expires__lt=now(), expiring=True)
query_not_expiring = Q(expiring=False)
return cls.objects.filter(query & (query_not_expired_yet | query_not_expiring))
@property
def is_expired(self) -> bool:
"""Check if token is expired yet."""
return now() > self.expires
class Meta:
abstract = True
class TokenIntents(models.TextChoices): class TokenIntents(models.TextChoices):
"""Intents a Token can be created for.""" """Intents a Token can be created for."""
@ -208,34 +233,16 @@ class TokenIntents(models.TextChoices):
INTENT_API = "api" INTENT_API = "api"
class Token(models.Model): class Token(ExpiringModel):
"""Token used to authenticate the User for API Access or confirm another Stage like Email.""" """Token used to authenticate the User for API Access or confirm another Stage like Email."""
token_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4) token_uuid = models.UUIDField(primary_key=True, editable=False, default=uuid4)
intent = models.TextField( intent = models.TextField(
choices=TokenIntents.choices, default=TokenIntents.INTENT_VERIFICATION choices=TokenIntents.choices, default=TokenIntents.INTENT_VERIFICATION
) )
expires = models.DateTimeField(default=default_token_duration)
user = models.ForeignKey("User", on_delete=models.CASCADE, related_name="+") user = models.ForeignKey("User", on_delete=models.CASCADE, related_name="+")
expiring = models.BooleanField(default=True)
description = models.TextField(default="", blank=True) description = models.TextField(default="", blank=True)
@staticmethod
def filter_not_expired(**kwargs) -> QuerySet:
"""Filer for tokens which are not expired yet or are not expiring,
and match filters in `kwargs`"""
query = Q(**kwargs)
query_not_expired_yet = Q(expires__lt=now(), expiring=True)
query_not_expiring = Q(expiring=False)
return Token.objects.filter(
query & (query_not_expired_yet | query_not_expiring)
)
@property
def is_expired(self) -> bool:
"""Check if token is expired yet."""
return now() > self.expires
def __str__(self): def __str__(self):
return ( return (
f"Token {self.token_uuid.hex} {self.description} (expires={self.expires})" f"Token {self.token_uuid.hex} {self.description} (expires={self.expires})"

View file

@ -1,15 +1,16 @@
"""passbook core tasks""" """passbook core tasks"""
from django.utils.timezone import now
from structlog import get_logger from structlog import get_logger
from passbook.core.models import Token from passbook.core.models import ExpiringModel
from passbook.root.celery import CELERY_APP from passbook.root.celery import CELERY_APP
LOGGER = get_logger() LOGGER = get_logger()
@CELERY_APP.task() @CELERY_APP.task()
def clean_tokens(): def clean_expired_models():
"""Remove expired tokens""" """Remove expired objects"""
amount, _ = Token.objects.filter(expires__lt=now(), expiring=True).delete() for cls in ExpiringModel.__subclasses__():
LOGGER.debug("Deleted expired tokens", amount=amount) cls: ExpiringModel
amount, _ = cls.filter_not_expired().delete()
LOGGER.debug("Deleted expired models", model=cls, amount=amount)

View file

@ -1,10 +1,10 @@
"""passbook user view tests""" """passbook core task tests"""
from django.test import TestCase from django.test import TestCase
from django.utils.timezone import now from django.utils.timezone import now
from guardian.shortcuts import get_anonymous_user from guardian.shortcuts import get_anonymous_user
from passbook.core.models import Token from passbook.core.models import Token
from passbook.core.tasks import clean_tokens from passbook.core.tasks import clean_expired_models
class TestTasks(TestCase): class TestTasks(TestCase):
@ -14,5 +14,5 @@ class TestTasks(TestCase):
"""Test Token cleanup task""" """Test Token cleanup task"""
Token.objects.create(expires=now(), user=get_anonymous_user()) Token.objects.create(expires=now(), user=get_anonymous_user())
self.assertEqual(Token.objects.all().count(), 1) self.assertEqual(Token.objects.all().count(), 1)
clean_tokens() clean_expired_models()
self.assertEqual(Token.objects.all().count(), 0) self.assertEqual(Token.objects.all().count(), 0)

View file

@ -33,9 +33,6 @@ class SourceTypeManager:
self.__source_types[kind.value] = {} self.__source_types[kind.value] = {}
self.__source_types[kind.value][slugify(name)] = cls self.__source_types[kind.value][slugify(name)] = cls
self.__names.append(name) self.__names.append(name)
LOGGER.debug(
"Registered source", source_class=cls.__name__, kind=kind.value
)
return cls return cls
return inner_wrapper return inner_wrapper