policies/reputation: implement MonitoredTask

This commit is contained in:
Jens Langhammer 2020-10-16 14:20:24 +02:00
parent 4b3e0f0f96
commit e4f45eba0a
2 changed files with 17 additions and 12 deletions

View File

@ -3,6 +3,7 @@ from django.core.cache import cache
from structlog import get_logger from structlog import get_logger
from passbook.core.models import User from passbook.core.models import User
from passbook.lib.tasks import MonitoredTask, TaskResult, TaskResultStatus
from passbook.policies.reputation.models import IPReputation, UserReputation from passbook.policies.reputation.models import IPReputation, UserReputation
from passbook.policies.reputation.signals import ( from passbook.policies.reputation.signals import (
CACHE_KEY_IP_PREFIX, CACHE_KEY_IP_PREFIX,
@ -13,27 +14,26 @@ from passbook.root.celery import CELERY_APP
LOGGER = get_logger() LOGGER = get_logger()
@CELERY_APP.task() @CELERY_APP.task(bind=True, base=MonitoredTask)
def save_ip_reputation(): def save_ip_reputation(self: MonitoredTask):
"""Save currently cached reputation to database""" """Save currently cached reputation to database"""
keys = cache.keys(CACHE_KEY_IP_PREFIX + "*")
objects_to_update = [] objects_to_update = []
for key in keys: for key, score in cache.get_many(CACHE_KEY_IP_PREFIX + "*").items():
score = cache.get(key)
remote_ip = key.replace(CACHE_KEY_IP_PREFIX, "") remote_ip = key.replace(CACHE_KEY_IP_PREFIX, "")
rep, _ = IPReputation.objects.get_or_create(ip=remote_ip) rep, _ = IPReputation.objects.get_or_create(ip=remote_ip)
rep.score = score rep.score = score
objects_to_update.append(rep) objects_to_update.append(rep)
IPReputation.objects.bulk_update(objects_to_update, ["score"]) IPReputation.objects.bulk_update(objects_to_update, ["score"])
self.set_status(
TaskResult(TaskResultStatus.SUCCESSFUL, ["Successfully updated IP Reputation"])
)
@CELERY_APP.task() @CELERY_APP.task(bind=True, base=MonitoredTask)
def save_user_reputation(): def save_user_reputation(self: MonitoredTask):
"""Save currently cached reputation to database""" """Save currently cached reputation to database"""
keys = cache.keys(CACHE_KEY_USER_PREFIX + "*")
objects_to_update = [] objects_to_update = []
for key in keys: for key, score in cache.get_many(CACHE_KEY_USER_PREFIX + "*").items():
score = cache.get(key)
username = key.replace(CACHE_KEY_USER_PREFIX, "") username = key.replace(CACHE_KEY_USER_PREFIX, "")
users = User.objects.filter(username=username) users = User.objects.filter(username=username)
if not users.exists(): if not users.exists():
@ -43,3 +43,8 @@ def save_user_reputation():
rep.score = score rep.score = score
objects_to_update.append(rep) objects_to_update.append(rep)
UserReputation.objects.bulk_update(objects_to_update, ["score"]) UserReputation.objects.bulk_update(objects_to_update, ["score"])
self.set_status(
TaskResult(
TaskResultStatus.SUCCESSFUL, ["Successfully updated User Reputation"]
)
)

View File

@ -33,7 +33,7 @@ class TestReputationPolicy(TestCase):
# Test value in cache # Test value in cache
self.assertEqual(cache.get(CACHE_KEY_IP_PREFIX + self.test_ip), -1) self.assertEqual(cache.get(CACHE_KEY_IP_PREFIX + self.test_ip), -1)
# Save cache and check db values # Save cache and check db values
save_ip_reputation() save_ip_reputation.delay()
self.assertEqual(IPReputation.objects.get(ip=self.test_ip).score, -1) self.assertEqual(IPReputation.objects.get(ip=self.test_ip).score, -1)
def test_user_reputation(self): def test_user_reputation(self):
@ -43,7 +43,7 @@ class TestReputationPolicy(TestCase):
# Test value in cache # Test value in cache
self.assertEqual(cache.get(CACHE_KEY_USER_PREFIX + self.test_username), -1) self.assertEqual(cache.get(CACHE_KEY_USER_PREFIX + self.test_username), -1)
# Save cache and check db values # Save cache and check db values
save_user_reputation() save_user_reputation.delay()
self.assertEqual(UserReputation.objects.get(user=self.user).score, -1) self.assertEqual(UserReputation.objects.get(user=self.user).score, -1)
def test_policy(self): def test_policy(self):