core: Add resolve_dns and reverse_dns functions to evaluator (#4769)

* Add resolve_dns

* Add reverse_dns

* Fix lint

* add caching, small optimisation

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* Added time-aware LRU cache

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
Co-authored-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
sdimovv 2023-03-01 23:15:13 +02:00 committed by GitHub
parent 2eb7c16a9a
commit a6eba37d5a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 0 deletions

View file

@ -1,9 +1,11 @@
"""authentik expression policy evaluator"""
import re
import socket
from ipaddress import ip_address, ip_network
from textwrap import indent
from typing import Any, Iterable, Optional
from cachetools import TLRUCache, cached
from django.core.exceptions import FieldError
from django_otp import devices_for_user
from rest_framework.serializers import ValidationError
@ -41,6 +43,8 @@ class BaseEvaluator:
"ak_is_group_member": BaseEvaluator.expr_is_group_member,
"ak_user_by": BaseEvaluator.expr_user_by,
"ak_user_has_authenticator": BaseEvaluator.expr_func_user_has_authenticator,
"resolve_dns": BaseEvaluator.expr_resolve_dns,
"reverse_dns": BaseEvaluator.expr_reverse_dns,
"ak_create_event": self.expr_event_create,
"ak_logger": get_logger(self._filename).bind(),
"requests": get_http_session(),
@ -49,6 +53,39 @@ class BaseEvaluator:
}
self._context = {}
@cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180))
@staticmethod
def expr_resolve_dns(host: str, ip_version: Optional[int] = None) -> list[str]:
"""Resolve host to a list of IPv4 and/or IPv6 addresses."""
# Although it seems to be fine (raising OSError), docs warn
# against passing `None` for both the host and the port
# https://docs.python.org/3/library/socket.html#socket.getaddrinfo
host = host or ""
ip_list = []
family = 0
if ip_version == 4:
family = socket.AF_INET
if ip_version == 6:
family = socket.AF_INET6
try:
for ip_addr in socket.getaddrinfo(host, None, family=family):
ip_list.append(str(ip_addr[4][0]))
except OSError:
pass
return list(set(ip_list))
@cached(cache=TLRUCache(maxsize=32, ttu=lambda key, value, now: now + 180))
@staticmethod
def expr_reverse_dns(ip_addr: str) -> str:
"""Perform a reverse DNS lookup."""
try:
return socket.getfqdn(ip_addr)
except OSError:
return ip_addr
@staticmethod
def expr_flatten(value: list[Any] | Any) -> Optional[Any]:
"""Flatten `value` if its a list"""

View file

@ -100,3 +100,30 @@ You can also check if an IP Address is within a subnet by writing the following:
ip_address('192.0.2.1') in ip_network('192.0.2.0/24')
# evaluates to True
```
## DNS resolution and reverse DNS lookups
:::note
Requires authentik 2023.3 or higher
:::
To resolve a hostname to a list of IP addresses, use the functions `resolve_dns(hostname)` and `resolve_dns(hostname, ip_version)`.
```python
resolve_dns("google.com") # return a list of all IPv4 and IPv6 addresses
resolve_dns("google.com", 4) # return a list of only IP4 addresses
resolve_dns("google.com", 6) # return a list of only IP6 addresses
```
You can also do reverse DNS lookups.
:::note
Reverse DNS lookups may not return the expected host if the IP address is part of a shared hosting environment.
See: https://stackoverflow.com/a/19867936
:::
To perform a reverse DNS lookup use `reverse_dns("192.0.2.0")`. If no DNS records are found the original IP address is returned.
:::info
DNS resolving results are cached in memory. The last 32 unique queries are cached for up to 3 minutes.
:::