1cfe1aff13
* root: initial rename * web: rename custom element prefix * root: rename external functions with pb_ prefix * root: fix formatting * root: replace domain with goauthentik.io * proxy: update path * root: rename remaining prefixes * flows: rename file extension * root: pbadmin -> akadmin * docs: fix image filenames * lifecycle: ignore migration files * ci: copy default config from current source before loading last tagged * *: new sentry dsn * tests: fix missing python3.9-dev package * root: add additional migrations for service accounts created by outposts * core: mark system-created service accounts with attribute * policies/expression: fix pb_ replacement not working * web: fix last linting errors, add lit-analyse * policies/expressions: fix lint errors * web: fix sidebar display on screens where not all items fit * proxy: attempt to fix proxy pipeline * proxy: use go env GOPATH to get gopath * lib: fix user_default naming inconsistency * docs: add upgrade docs * docs: update screenshots to use authentik * admin: fix create button on empty-state of outpost * web: fix modal submit not refreshing SiteShell and Table * web: fix height of app-card and height of generic icon * web: fix rendering of subtext * admin: fix version check error not being caught * web: fix worker count not being shown * docs: update screenshots * root: new icon * web: fix lint error * admin: fix linting error * root: migrate coverage config to pyproject
56 lines
2.1 KiB
Python
56 lines
2.1 KiB
Python
"""test reputation signals and policy"""
|
|
from django.contrib.auth import authenticate
|
|
from django.core.cache import cache
|
|
from django.test import TestCase
|
|
|
|
from authentik.core.models import User
|
|
from authentik.policies.reputation.models import (
|
|
CACHE_KEY_IP_PREFIX,
|
|
CACHE_KEY_USER_PREFIX,
|
|
IPReputation,
|
|
ReputationPolicy,
|
|
UserReputation,
|
|
)
|
|
from authentik.policies.reputation.tasks import save_ip_reputation, save_user_reputation
|
|
from authentik.policies.types import PolicyRequest
|
|
|
|
|
|
class TestReputationPolicy(TestCase):
|
|
"""test reputation signals and policy"""
|
|
|
|
def setUp(self):
|
|
self.test_ip = "255.255.255.255"
|
|
self.test_username = "test"
|
|
cache.delete(CACHE_KEY_IP_PREFIX + self.test_ip)
|
|
cache.delete(CACHE_KEY_USER_PREFIX + self.test_username)
|
|
# We need a user for the one-to-one in userreputation
|
|
self.user = User.objects.create(username=self.test_username)
|
|
|
|
def test_ip_reputation(self):
|
|
"""test IP reputation"""
|
|
# Trigger negative reputation
|
|
authenticate(None, username=self.test_username, password=self.test_username)
|
|
# Test value in cache
|
|
self.assertEqual(cache.get(CACHE_KEY_IP_PREFIX + self.test_ip), -1)
|
|
# Save cache and check db values
|
|
save_ip_reputation.delay().get()
|
|
self.assertEqual(IPReputation.objects.get(ip=self.test_ip).score, -1)
|
|
|
|
def test_user_reputation(self):
|
|
"""test User reputation"""
|
|
# Trigger negative reputation
|
|
authenticate(None, username=self.test_username, password=self.test_username)
|
|
# Test value in cache
|
|
self.assertEqual(cache.get(CACHE_KEY_USER_PREFIX + self.test_username), -1)
|
|
# Save cache and check db values
|
|
save_user_reputation.delay().get()
|
|
self.assertEqual(UserReputation.objects.get(user=self.user).score, -1)
|
|
|
|
def test_policy(self):
|
|
"""Test Policy"""
|
|
request = PolicyRequest(user=self.user)
|
|
policy: ReputationPolicy = ReputationPolicy.objects.create(
|
|
name="reputation-test", threshold=0
|
|
)
|
|
self.assertTrue(policy.passes(request).passing)
|