events: cleanse http query string in events (#5508)

* events: cleanse http query string in events

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add more tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L 2023-05-07 20:11:36 +02:00 committed by GitHub
parent 53f827b54f
commit 7df0e88b9d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 3 deletions

View file

@ -219,13 +219,13 @@ class Event(SerializerModel, ExpiringModel):
self.context["http_request"] = {
"path": request.path,
"method": request.method,
"args": QueryDict(request.META.get("QUERY_STRING", "")),
"args": cleanse_dict(QueryDict(request.META.get("QUERY_STRING", ""))),
}
# Special case for events created during flow execution
# since they keep the http query within a wrapped query
if QS_QUERY in self.context["http_request"]["args"]:
wrapped = self.context["http_request"]["args"][QS_QUERY]
self.context["http_request"]["args"] = QueryDict(wrapped)
self.context["http_request"]["args"] = cleanse_dict(QueryDict(wrapped))
if hasattr(request, "tenant"):
tenant: Tenant = request.tenant
# Because self.created only gets set on save, we can't use it's value here

View file

@ -1,17 +1,25 @@
"""event tests"""
from urllib.parse import urlencode
from django.contrib.contenttypes.models import ContentType
from django.test import TestCase
from django.test import RequestFactory, TestCase
from django.views.debug import SafeExceptionReporterFilter
from guardian.shortcuts import get_anonymous_user
from authentik.core.models import Group
from authentik.events.models import Event
from authentik.flows.views.executor import QS_QUERY
from authentik.lib.generators import generate_id
from authentik.policies.dummy.models import DummyPolicy
from authentik.tenants.models import Tenant
class TestEvents(TestCase):
"""Test Event"""
def setUp(self) -> None:
self.factory = RequestFactory()
def test_new_with_model(self):
"""Create a new Event passing a model as kwarg"""
test_model = Group.objects.create(name="test")
@ -40,3 +48,58 @@ class TestEvents(TestCase):
model_content_type = ContentType.objects.get_for_model(temp_model)
self.assertEqual(event.context.get("model").get("app"), model_content_type.app_label)
self.assertEqual(event.context.get("model").get("pk"), temp_model.pk.hex)
def test_from_http_basic(self):
"""Test plain from_http"""
event = Event.new("unittest").from_http(self.factory.get("/"))
self.assertEqual(
event.context, {"http_request": {"args": {}, "method": "GET", "path": "/"}}
)
def test_from_http_clean_querystring(self):
"""Test cleansing query string"""
request = self.factory.get(f"/?token={generate_id()}")
event = Event.new("unittest").from_http(request)
self.assertEqual(
event.context,
{
"http_request": {
"args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
"method": "GET",
"path": "/",
}
},
)
def test_from_http_clean_querystring_flow(self):
"""Test cleansing query string (nested query string like flow executor)"""
nested_qs = {"token": generate_id()}
request = self.factory.get(f"/?{QS_QUERY}={urlencode(nested_qs)}")
event = Event.new("unittest").from_http(request)
self.assertEqual(
event.context,
{
"http_request": {
"args": {"token": SafeExceptionReporterFilter.cleansed_substitute},
"method": "GET",
"path": "/",
}
},
)
def test_from_http_tenant(self):
"""Test from_http tenant"""
# Test tenant
request = self.factory.get("/")
tenant = Tenant(domain="test-tenant")
setattr(request, "tenant", tenant)
event = Event.new("unittest").from_http(request)
self.assertEqual(
event.tenant,
{
"app": "authentik_tenants",
"model_name": "tenant",
"name": "Tenant test-tenant",
"pk": tenant.pk.hex,
},
)