lib: fix outpost fake-ip not working, add tests
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
7370dd5f3f
commit
c05240afbf
67
authentik/lib/tests/test_http.py
Normal file
67
authentik/lib/tests/test_http.py
Normal file
|
@ -0,0 +1,67 @@
|
|||
"""Test HTTP Helpers"""
|
||||
from django.test import RequestFactory, TestCase
|
||||
|
||||
from authentik.core.models import (
|
||||
USER_ATTRIBUTE_CAN_OVERRIDE_IP,
|
||||
Token,
|
||||
TokenIntents,
|
||||
User,
|
||||
)
|
||||
from authentik.lib.utils.http import (
|
||||
OUTPOST_REMOTE_IP_HEADER,
|
||||
OUTPOST_TOKEN_HEADER,
|
||||
get_client_ip,
|
||||
)
|
||||
|
||||
|
||||
class TestHTTP(TestCase):
|
||||
"""Test HTTP Helpers"""
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.user = User.objects.get(username="akadmin")
|
||||
self.factory = RequestFactory()
|
||||
|
||||
def test_normal(self):
|
||||
"""Test normal request"""
|
||||
request = self.factory.get("/")
|
||||
self.assertEqual(get_client_ip(request), "127.0.0.1")
|
||||
|
||||
def test_forward_for(self):
|
||||
"""Test x-forwarded-for request"""
|
||||
request = self.factory.get("/", HTTP_X_FORWARDED_FOR="127.0.0.2")
|
||||
self.assertEqual(get_client_ip(request), "127.0.0.2")
|
||||
|
||||
def test_fake_outpost(self):
|
||||
"""Test faked IP which is overridden by an outpost"""
|
||||
token = Token.objects.create(
|
||||
identifier="test", user=self.user, intent=TokenIntents.INTENT_API
|
||||
)
|
||||
# Invalid, non-existant token
|
||||
request = self.factory.get(
|
||||
"/",
|
||||
**{
|
||||
OUTPOST_REMOTE_IP_HEADER: "1.2.3.4",
|
||||
OUTPOST_TOKEN_HEADER: "abc",
|
||||
},
|
||||
)
|
||||
self.assertEqual(get_client_ip(request), "127.0.0.1")
|
||||
# Invalid, user doesn't have permisions
|
||||
request = self.factory.get(
|
||||
"/",
|
||||
**{
|
||||
OUTPOST_REMOTE_IP_HEADER: "1.2.3.4",
|
||||
OUTPOST_TOKEN_HEADER: token.key,
|
||||
},
|
||||
)
|
||||
self.assertEqual(get_client_ip(request), "127.0.0.1")
|
||||
# Valid
|
||||
self.user.attributes[USER_ATTRIBUTE_CAN_OVERRIDE_IP] = True
|
||||
self.user.save()
|
||||
request = self.factory.get(
|
||||
"/",
|
||||
**{
|
||||
OUTPOST_REMOTE_IP_HEADER: "1.2.3.4",
|
||||
OUTPOST_TOKEN_HEADER: token.key,
|
||||
},
|
||||
)
|
||||
self.assertEqual(get_client_ip(request), "1.2.3.4")
|
|
@ -40,24 +40,30 @@ def _get_outpost_override_ip(request: HttpRequest) -> Optional[str]:
|
|||
or OUTPOST_TOKEN_HEADER not in request.META
|
||||
):
|
||||
return None
|
||||
fake_ip = request.META[OUTPOST_REMOTE_IP_HEADER]
|
||||
tokens = Token.filter_not_expired(
|
||||
key=request.META.get(OUTPOST_TOKEN_HEADER), intent=TokenIntents.INTENT_API
|
||||
)
|
||||
if not tokens.exists():
|
||||
LOGGER.warning("Attempted remote-ip override without token")
|
||||
LOGGER.warning("Attempted remote-ip override without token", fake_ip=fake_ip)
|
||||
return None
|
||||
user = tokens.first().user
|
||||
if user.group_attributes().get(USER_ATTRIBUTE_CAN_OVERRIDE_IP, False):
|
||||
if not user.group_attributes().get(USER_ATTRIBUTE_CAN_OVERRIDE_IP, False):
|
||||
LOGGER.warning(
|
||||
"Remote-IP override: user doesn't have permission",
|
||||
user=user,
|
||||
fake_ip=fake_ip,
|
||||
)
|
||||
return None
|
||||
return request.META[OUTPOST_REMOTE_IP_HEADER]
|
||||
return fake_ip
|
||||
|
||||
|
||||
def get_client_ip(request: Optional[HttpRequest]) -> str:
|
||||
"""Attempt to get the client's IP by checking common HTTP Headers.
|
||||
Returns none if no IP Could be found"""
|
||||
if request:
|
||||
if not request:
|
||||
return DEFAULT_IP
|
||||
override = _get_outpost_override_ip(request)
|
||||
if override:
|
||||
return override
|
||||
return _get_client_ip_from_meta(request.META)
|
||||
return DEFAULT_IP
|
||||
|
|
Reference in a new issue