root: use single redis db (#4009)

* use single redis db

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* cleanup prefixes

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* ensure __str__ always returns string

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* fix remaining old prefixes

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

* add release notes

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens L 2022-11-15 14:31:29 +01:00 committed by GitHub
parent 9f269faf53
commit 55aa1897af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 60 additions and 63 deletions

View File

@ -197,7 +197,4 @@ dev-reset:
dropdb -U postgres -h localhost authentik
createdb -U postgres -h localhost authentik
redis-cli -n 0 flushall
redis-cli -n 1 flushall
redis-cli -n 2 flushall
redis-cli -n 3 flushall
make migrate

View File

@ -37,7 +37,7 @@ LOGGER = get_logger()
def user_app_cache_key(user_pk: str) -> str:
"""Cache key where application list for user is saved"""
return f"user_app_cache_{user_pk}"
return f"goauthentik.io/core/app_access/{user_pk}"
class ApplicationSerializer(ModelSerializer):

View File

@ -297,7 +297,7 @@ class Provider(SerializerModel):
raise NotImplementedError
def __str__(self):
return self.name
return str(self.name)
class Application(SerializerModel, PolicyBindingModel):
@ -379,7 +379,7 @@ class Application(SerializerModel, PolicyBindingModel):
return None
def __str__(self):
return self.name
return str(self.name)
class Meta:
@ -481,7 +481,7 @@ class Source(ManagedModel, SerializerModel, PolicyBindingModel):
return None
def __str__(self):
return self.name
return str(self.name)
class Meta:

View File

@ -293,7 +293,7 @@ class Event(SerializerModel, ExpiringModel):
return f"{self.action}: {self.context}"
def __str__(self) -> str:
return f"<Event action={self.action} user={self.user} context={self.context}>"
return f"Event action={self.action} user={self.user} context={self.context}"
class Meta:

View File

@ -15,6 +15,7 @@ from authentik.events.models import Event, EventAction
from authentik.lib.utils.errors import exception_to_string
LOGGER = get_logger()
CACHE_KEY_PREFIX = "goauthentik.io/events/tasks/"
class TaskResultStatus(Enum):
@ -70,16 +71,16 @@ class TaskInfo:
@staticmethod
def all() -> dict[str, "TaskInfo"]:
"""Get all TaskInfo objects"""
return cache.get_many(cache.keys("task_*"))
return cache.get_many(cache.keys(CACHE_KEY_PREFIX + "*"))
@staticmethod
def by_name(name: str) -> Optional["TaskInfo"]:
"""Get TaskInfo Object by name"""
return cache.get(f"task_{name}", None)
return cache.get(CACHE_KEY_PREFIX + name, None)
def delete(self):
"""Delete task info from cache"""
return cache.delete(f"task_{self.task_name}")
return cache.delete(CACHE_KEY_PREFIX + self.task_name)
def set_prom_metrics(self):
"""Update prometheus metrics"""
@ -98,9 +99,9 @@ class TaskInfo:
def save(self, timeout_hours=6):
"""Save task into cache"""
key = f"task_{self.task_name}"
key = CACHE_KEY_PREFIX + self.task_name
if self.result.uid:
key += f"_{self.result.uid}"
key += f"/{self.result.uid}"
self.task_name += f"_{self.result.uid}"
self.set_prom_metrics()
cache.set(key, self, timeout=timeout_hours * 60 * 60)

View File

@ -31,7 +31,7 @@ CACHE_TIMEOUT = int(CONFIG.y("redis.cache_timeout_flows"))
def cache_key(flow: Flow, user: Optional[User] = None) -> str:
"""Generate Cache key for flow"""
prefix = f"flow_{flow.pk}"
prefix = f"goauthentik.io/flows/planner/{flow.pk}"
if user:
prefix += f"#{user.pk}"
return prefix

View File

@ -19,10 +19,7 @@ redis:
password: ''
tls: false
tls_reqs: "none"
cache_db: 0
message_queue_db: 1
ws_db: 2
outpost_session_db: 3
db: 0
cache_timeout: 300
cache_timeout_flows: 300
cache_timeout_policies: 300

View File

@ -292,7 +292,7 @@ class Outpost(SerializerModel, ManagedModel):
@property
def state_cache_prefix(self) -> str:
"""Key by which the outposts status is saved"""
return f"outpost_{self.uuid.hex}_state"
return f"goauthentik.io/outposts/{self.uuid.hex}_state"
@property
def state(self) -> list["OutpostState"]:

View File

@ -41,6 +41,9 @@ class PolicyBindingModel(models.Model):
objects = InheritanceManager()
def __str__(self) -> str:
return f"PolicyBindingModel {self.pbm_uuid}"
class Meta:
verbose_name = _("Policy Binding Model")
verbose_name_plural = _("Policy Binding Models")
@ -135,6 +138,7 @@ class PolicyBinding(SerializerModel):
return f"Binding from {self.target} #{self.order} to {suffix}"
except PolicyBinding.target.RelatedObjectDoesNotExist: # pylint: disable=no-member
return f"Binding - #{self.order} to {suffix}"
return ""
class Meta:
@ -175,7 +179,7 @@ class Policy(SerializerModel, CreatedUpdatedModel):
raise NotImplementedError
def __str__(self):
return self.name
return str(self.name)
def passes(self, request: PolicyRequest) -> PolicyResult: # pragma: no cover
"""Check if request passes this policy"""

View File

@ -14,7 +14,7 @@ from authentik.lib.utils.errors import exception_to_string
from authentik.policies.apps import HIST_POLICIES_EXECUTION_TIME
from authentik.policies.exceptions import PolicyException
from authentik.policies.models import PolicyBinding
from authentik.policies.types import PolicyRequest, PolicyResult
from authentik.policies.types import CACHE_PREFIX, PolicyRequest, PolicyResult
LOGGER = get_logger()
@ -25,7 +25,7 @@ PROCESS_CLASS = FORK_CTX.Process
def cache_key(binding: PolicyBinding, request: PolicyRequest) -> str:
"""Generate Cache key for policy"""
prefix = f"policy_{binding.policy_binding_uuid.hex}_"
prefix = f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}_"
if request.http_request and hasattr(request.http_request, "session"):
prefix += f"_{request.http_request.session.session_key}"
if request.user:

View File

@ -23,12 +23,12 @@ def update_score(request: HttpRequest, identifier: str, amount: int):
try:
# We only update the cache here, as its faster than writing to the DB
score = cache.get_or_set(
CACHE_KEY_PREFIX + remote_ip + identifier,
CACHE_KEY_PREFIX + remote_ip + "/" + identifier,
{"ip": remote_ip, "identifier": identifier, "score": 0},
CACHE_TIMEOUT,
)
score["score"] += amount
cache.set(CACHE_KEY_PREFIX + remote_ip + identifier, score)
cache.set(CACHE_KEY_PREFIX + remote_ip + "/" + identifier, score)
except ValueError as exc:
LOGGER.warning("failed to set reputation", exc=exc)

View File

@ -32,7 +32,7 @@ class TestReputationPolicy(TestCase):
)
# Test value in cache
self.assertEqual(
cache.get(CACHE_KEY_PREFIX + self.test_ip + self.test_username),
cache.get(CACHE_KEY_PREFIX + self.test_ip + "/" + self.test_username),
{"ip": "127.0.0.1", "identifier": "test", "score": -1},
)
# Save cache and check db values
@ -47,7 +47,7 @@ class TestReputationPolicy(TestCase):
)
# Test value in cache
self.assertEqual(
cache.get(CACHE_KEY_PREFIX + self.test_ip + self.test_username),
cache.get(CACHE_KEY_PREFIX + self.test_ip + "/" + self.test_username),
{"ip": "127.0.0.1", "identifier": "test", "score": -1},
)
# Save cache and check db values

View File

@ -6,6 +6,7 @@ from structlog.stdlib import get_logger
from authentik.core.api.applications import user_app_cache_key
from authentik.policies.apps import GAUGE_POLICIES_CACHED
from authentik.policies.types import CACHE_PREFIX
from authentik.root.monitoring import monitoring_set
LOGGER = get_logger()
@ -15,7 +16,7 @@ LOGGER = get_logger()
# pylint: disable=unused-argument
def monitoring_set_policies(sender, **kwargs):
"""set policy gauges"""
GAUGE_POLICIES_CACHED.set(len(cache.keys("policy_*") or []))
GAUGE_POLICIES_CACHED.set(len(cache.keys(f"{CACHE_PREFIX}_*") or []))
@receiver(post_save)
@ -27,7 +28,7 @@ def invalidate_policy_cache(sender, instance, **_):
if isinstance(instance, Policy):
total = 0
for binding in PolicyBinding.objects.filter(policy=instance):
prefix = f"policy_{binding.policy_binding_uuid.hex}_{binding.policy.pk.hex}*"
prefix = f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}_{binding.policy.pk.hex}*"
keys = cache.keys(prefix)
total += len(keys)
cache.delete_many(keys)

View File

@ -8,6 +8,7 @@ from authentik.policies.engine import PolicyEngine
from authentik.policies.expression.models import ExpressionPolicy
from authentik.policies.models import Policy, PolicyBinding, PolicyBindingModel, PolicyEngineMode
from authentik.policies.tests.test_process import clear_policy_cache
from authentik.policies.types import CACHE_PREFIX
class TestPolicyEngine(TestCase):
@ -101,8 +102,8 @@ class TestPolicyEngine(TestCase):
pbm = PolicyBindingModel.objects.create()
binding = PolicyBinding.objects.create(target=pbm, policy=self.policy_false, order=0)
engine = PolicyEngine(pbm, self.user)
self.assertEqual(len(cache.keys(f"policy_{binding.policy_binding_uuid.hex}*")), 0)
self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 0)
self.assertEqual(engine.build().passing, False)
self.assertEqual(len(cache.keys(f"policy_{binding.policy_binding_uuid.hex}*")), 1)
self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1)
self.assertEqual(engine.build().passing, False)
self.assertEqual(len(cache.keys(f"policy_{binding.policy_binding_uuid.hex}*")), 1)
self.assertEqual(len(cache.keys(f"{CACHE_PREFIX}{binding.policy_binding_uuid.hex}*")), 1)

View File

@ -16,6 +16,7 @@ if TYPE_CHECKING:
from authentik.policies.models import PolicyBinding
LOGGER = get_logger()
CACHE_PREFIX = "goauthentik.io/policies/"
@dataclass

View File

@ -2,6 +2,8 @@
from channels.generic.websocket import JsonWebsocketConsumer
from django.core.cache import cache
from authentik.root.messages.storage import CACHE_PREFIX
class MessageConsumer(JsonWebsocketConsumer):
"""Consumer which sends django.contrib.messages Messages over WS.
@ -12,11 +14,13 @@ class MessageConsumer(JsonWebsocketConsumer):
def connect(self):
self.accept()
self.session_key = self.scope["session"].session_key
cache.set(f"user_{self.session_key}_messages_{self.channel_name}", True, None)
if not self.session_key:
return
cache.set(f"{CACHE_PREFIX}{self.session_key}_messages_{self.channel_name}", True, None)
# pylint: disable=unused-argument
def disconnect(self, code):
cache.delete(f"user_{self.session_key}_messages_{self.channel_name}")
cache.delete(f"{CACHE_PREFIX}{self.session_key}_messages_{self.channel_name}")
def event_update(self, event: dict):
"""Event handler which is called by Messages Storage backend"""

View File

@ -7,6 +7,7 @@ from django.core.cache import cache
from django.http.request import HttpRequest
SESSION_KEY = "_messages"
CACHE_PREFIX = "goauthentik.io/root/messages_"
class ChannelsStorage(SessionStorage):
@ -18,7 +19,7 @@ class ChannelsStorage(SessionStorage):
self.channel = get_channel_layer()
def _store(self, messages: list[Message], response, *args, **kwargs):
prefix = f"user_{self.request.session.session_key}_messages_"
prefix = f"{CACHE_PREFIX}{self.request.session.session_key}_messages_"
keys = cache.keys(f"{prefix}*")
# if no active connections are open, fallback to storing messages in the
# session, so they can always be retrieved

View File

@ -195,9 +195,10 @@ _redis_url = (
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": f"{_redis_url}/{CONFIG.y('redis.cache_db')}",
"LOCATION": f"{_redis_url}/{CONFIG.y('redis.db')}",
"TIMEOUT": int(CONFIG.y("redis.cache_timeout", 300)),
"OPTIONS": {"CLIENT_CLASS": "django_redis.client.DefaultClient"},
"KEY_PREFIX": "authentik_cache",
}
}
DJANGO_REDIS_SCAN_ITERSIZE = 1000
@ -255,7 +256,8 @@ CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [f"{_redis_url}/{CONFIG.y('redis.ws_db')}"],
"hosts": [f"{_redis_url}/{CONFIG.y('redis.db')}"],
"prefix": "authentik_channels",
},
},
}
@ -338,12 +340,8 @@ CELERY_BEAT_SCHEDULE = {
}
CELERY_TASK_CREATE_MISSING_QUEUES = True
CELERY_TASK_DEFAULT_QUEUE = "authentik"
CELERY_BROKER_URL = (
f"{_redis_url}/{CONFIG.y('redis.message_queue_db')}{REDIS_CELERY_TLS_REQUIREMENTS}"
)
CELERY_RESULT_BACKEND = (
f"{_redis_url}/{CONFIG.y('redis.message_queue_db')}{REDIS_CELERY_TLS_REQUIREMENTS}"
)
CELERY_BROKER_URL = f"{_redis_url}/{CONFIG.y('redis.db')}{REDIS_CELERY_TLS_REQUIREMENTS}"
CELERY_RESULT_BACKEND = f"{_redis_url}/{CONFIG.y('redis.db')}{REDIS_CELERY_TLS_REQUIREMENTS}"
# Sentry integration
env = get_env()

View File

@ -166,7 +166,7 @@ class LDAPPropertyMapping(PropertyMapping):
return LDAPPropertyMappingSerializer
def __str__(self):
return self.name
return str(self.name)
class Meta:

View File

@ -99,7 +99,7 @@ class DuoDevice(SerializerModel, Device):
return DuoDeviceSerializer
def __str__(self):
return self.name or str(self.user)
return str(self.name) or str(self.user)
class Meta:

View File

@ -216,7 +216,7 @@ class SMSDevice(SerializerModel, SideChannelDevice):
return valid
def __str__(self):
return self.name or str(self.user)
return str(self.name) or str(self.user)
class Meta:
verbose_name = _("SMS Device")

View File

@ -146,7 +146,7 @@ class WebAuthnDevice(SerializerModel, Device):
return WebAuthnDeviceSerializer
def __str__(self):
return self.name or str(self.user)
return str(self.name) or str(self.user)
class Meta:

View File

@ -17,10 +17,7 @@ type RedisConfig struct {
Password string `yaml:"password" env:"AUTHENTIK_REDIS__PASSWORD"`
TLS bool `yaml:"tls" env:"AUTHENTIK_REDIS__TLS"`
TLSReqs string `yaml:"tls_reqs" env:"AUTHENTIK_REDIS__TLS_REQS"`
CacheDB int `yaml:"cache_db" env:"AUTHENTIK_REDIS__CACHE_DB"`
MessageQueueDB int `yaml:"message_queue_db" env:"AUTHENTIK_REDIS__MESSAGE_QUEUE_DB"`
WSDB int `yaml:"ws_db" env:"AUTHENTIK_REDIS__WS_DB"`
OutpostSessionDB int `yaml:"outpost_session_db" env:"AUTHENTIK_REDIS__OUTPOST_SESSION_DB"`
DB int `yaml:"cache_db" env:"AUTHENTIK_REDIS__CACHE_DB"`
CacheTimeout int `yaml:"cache_timeout" env:"AUTHENTIK_REDIS__CACHE_TIMEOUT"`
CacheTimeoutFlows int `yaml:"cache_timeout_flows" env:"AUTHENTIK_REDIS__CACHE_TIMEOUT_FLOWS"`
CacheTimeoutPolicies int `yaml:"cache_timeout_policies" env:"AUTHENTIK_REDIS__CACHE_TIMEOUT_POLICIES"`

View File

@ -16,11 +16,12 @@ import (
func (a *Application) getStore(p api.ProxyOutpostConfig, externalHost *url.URL) sessions.Store {
var store sessions.Store
if config.Get().Redis.Host != "" {
rs, err := redistore.NewRediStoreWithDB(10, "tcp", fmt.Sprintf("%s:%d", config.Get().Redis.Host, config.Get().Redis.Port), config.Get().Redis.Password, strconv.Itoa(config.Get().Redis.OutpostSessionDB), []byte(*p.CookieSecret))
rs, err := redistore.NewRediStoreWithDB(10, "tcp", fmt.Sprintf("%s:%d", config.Get().Redis.Host, config.Get().Redis.Port), config.Get().Redis.Password, strconv.Itoa(config.Get().Redis.DB), []byte(*p.CookieSecret))
if err != nil {
panic(err)
}
rs.SetMaxLength(math.MaxInt)
rs.SetKeyPrefix("authentik_proxy_session_")
if p.TokenValidity.IsSet() {
t := p.TokenValidity.Get()
// Add one to the validity to ensure we don't have a session with indefinite length

View File

@ -43,7 +43,7 @@ if CONFIG.y_bool("redis.tls", False):
REDIS_URL = (
f"{REDIS_PROTOCOL_PREFIX}:"
f"{quote_plus(CONFIG.y('redis.password'))}@{quote_plus(CONFIG.y('redis.host'))}:"
f"{int(CONFIG.y('redis.port'))}/{CONFIG.y('redis.ws_db')}"
f"{int(CONFIG.y('redis.port'))}/{CONFIG.y('redis.db')}"
)
while True:
try:

View File

@ -11550,7 +11550,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/PasswordPolicyRequest'
required: true
security:
- authentik: []
responses:
@ -11625,7 +11624,6 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/PasswordPolicyRequest'
required: true
security:
- authentik: []
responses:
@ -32929,7 +32927,6 @@ components:
required:
- bound_to
- component
- error_message
- meta_model_name
- pk
- verbose_name
@ -32975,7 +32972,6 @@ components:
minLength: 1
error_message:
type: string
minLength: 1
check_static_rules:
type: boolean
check_have_i_been_pwned:
@ -32993,8 +32989,6 @@ components:
minimum: 0
description: If the zxcvbn score is equal or less than this value, the policy
will fail.
required:
- error_message
PasswordStage:
type: object
description: PasswordStage Serializer
@ -34254,7 +34248,6 @@ components:
minLength: 1
error_message:
type: string
minLength: 1
check_static_rules:
type: boolean
check_have_i_been_pwned:

View File

@ -39,10 +39,7 @@ kubectl exec -it deployment/authentik-worker -c authentik -- ak dump_config
- `AUTHENTIK_REDIS__HOST`: Hostname of your Redis Server
- `AUTHENTIK_REDIS__PORT`: Redis port, defaults to 6379
- `AUTHENTIK_REDIS__PASSWORD`: Password for your Redis Server
- `AUTHENTIK_REDIS__CACHE_DB`: Database for caching, defaults to 0
- `AUTHENTIK_REDIS__MESSAGE_QUEUE_DB`: Database for the message queue, defaults to 1
- `AUTHENTIK_REDIS__WS_DB`: Database for websocket connections, defaults to 2
- `AUTHENTIK_REDIS__OUTPOST_SESSION_DB`: Database for sessions for the embedded outpost, defaults to 3
- `AUTHENTIK_REDIS__DB`: Database, defaults to 0
- `AUTHENTIK_REDIS__CACHE_TIMEOUT`: Timeout for cached data until it expires in seconds, defaults to 300
- `AUTHENTIK_REDIS__CACHE_TIMEOUT_FLOWS`: Timeout for cached flow plans until they expire in seconds, defaults to 300
- `AUTHENTIK_REDIS__CACHE_TIMEOUT_POLICIES`: Timeout for cached policies until they expire in seconds, defaults to 300

View File

@ -9,6 +9,10 @@ slug: "2022.11"
The policy has been merged with the password policy which provides the same functionality. Existing Have I Been Pwned policies will automatically be migrated.
- Instead of using multiple redis databases, authentik now uses a single redis database
This will temporarily loose some cached information after the upgrade, like cached system tasks and policy results. This data will be re-cached in the background.
## New features
- authentik now runs on Python 3.11