events: correctly handle lists for cleaning/sanitization
Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
parent
4d9c9160e7
commit
f7601d9571
|
@ -23,24 +23,32 @@ from authentik.policies.types import PolicyRequest
|
|||
ALLOWED_SPECIAL_KEYS = re.compile("passing", flags=re.I)
|
||||
|
||||
|
||||
def cleanse_item(key: str, value: Any) -> Any:
|
||||
"""Cleanse a single item"""
|
||||
if isinstance(value, dict):
|
||||
return cleanse_dict(value)
|
||||
elif isinstance(value, list):
|
||||
for idx, item in enumerate(value):
|
||||
if isinstance(value, dict):
|
||||
value[idx] = cleanse_dict(item)
|
||||
try:
|
||||
if SafeExceptionReporterFilter.hidden_settings.search(
|
||||
key
|
||||
) and not ALLOWED_SPECIAL_KEYS.search(key):
|
||||
return SafeExceptionReporterFilter.cleansed_substitute
|
||||
else:
|
||||
return value
|
||||
except TypeError: # pragma: no cover
|
||||
return value
|
||||
|
||||
|
||||
def cleanse_dict(source: dict[Any, Any]) -> dict[Any, Any]:
|
||||
"""Cleanse a dictionary, recursively"""
|
||||
final_dict = {}
|
||||
for key, value in source.items():
|
||||
try:
|
||||
if SafeExceptionReporterFilter.hidden_settings.search(
|
||||
key
|
||||
) and not ALLOWED_SPECIAL_KEYS.search(key):
|
||||
final_dict[key] = SafeExceptionReporterFilter.cleansed_substitute
|
||||
else:
|
||||
final_dict[key] = value
|
||||
except TypeError: # pragma: no cover
|
||||
final_dict[key] = value
|
||||
if isinstance(value, dict):
|
||||
final_dict[key] = cleanse_dict(value)
|
||||
elif isinstance(value, list):
|
||||
for idx, item in enumerate(value):
|
||||
value[idx] = cleanse_dict(item)
|
||||
new_value = cleanse_item(key, value)
|
||||
if new_value:
|
||||
final_dict[key] = new_value
|
||||
return final_dict
|
||||
|
||||
|
||||
|
@ -73,6 +81,41 @@ def get_user(user: User, original_user: Optional[User] = None) -> dict[str, Any]
|
|||
return user_data
|
||||
|
||||
|
||||
# pylint: disable=too-many-return-statements
|
||||
def sanitize_item(value: Any) -> Any:
|
||||
"""Sanitize a single item, ensure it is JSON parsable"""
|
||||
if is_dataclass(value):
|
||||
# Because asdict calls `copy.deepcopy(obj)` on everything that's not tuple/dict,
|
||||
# and deepcopy doesn't work with HttpRequests (neither django nor rest_framework).
|
||||
# Currently, the only dataclass that actually holds an http request is a PolicyRequest
|
||||
if isinstance(value, PolicyRequest):
|
||||
value.http_request = None
|
||||
value = asdict(value)
|
||||
if isinstance(value, dict):
|
||||
return sanitize_dict(value)
|
||||
elif isinstance(value, list):
|
||||
for idx, item in enumerate(value):
|
||||
value[idx] = sanitize_item(item)
|
||||
elif isinstance(value, (User, AnonymousUser)):
|
||||
return sanitize_dict(get_user(value))
|
||||
elif isinstance(value, models.Model):
|
||||
return sanitize_dict(model_to_dict(value))
|
||||
elif isinstance(value, UUID):
|
||||
return value.hex
|
||||
elif isinstance(value, (HttpRequest, WSGIRequest)):
|
||||
return None
|
||||
elif isinstance(value, City):
|
||||
return GEOIP_READER.city_to_dict(value)
|
||||
elif isinstance(value, Path):
|
||||
return str(value)
|
||||
elif isinstance(value, type):
|
||||
return {
|
||||
"type": value.__name__,
|
||||
"module": value.__module__,
|
||||
}
|
||||
return value
|
||||
|
||||
|
||||
def sanitize_dict(source: dict[Any, Any]) -> dict[Any, Any]:
|
||||
"""clean source of all Models that would interfere with the JSONField.
|
||||
Models are replaced with a dictionary of {
|
||||
|
@ -82,35 +125,7 @@ def sanitize_dict(source: dict[Any, Any]) -> dict[Any, Any]:
|
|||
}"""
|
||||
final_dict = {}
|
||||
for key, value in source.items():
|
||||
if is_dataclass(value):
|
||||
# Because asdict calls `copy.deepcopy(obj)` on everything that's not tuple/dict,
|
||||
# and deepcopy doesn't work with HttpRequests (neither django nor rest_framework).
|
||||
# Currently, the only dataclass that actually holds an http request is a PolicyRequest
|
||||
if isinstance(value, PolicyRequest):
|
||||
value.http_request = None
|
||||
value = asdict(value)
|
||||
if isinstance(value, dict):
|
||||
final_dict[key] = sanitize_dict(value)
|
||||
elif isinstance(value, list):
|
||||
for idx, item in enumerate(value):
|
||||
value[idx] = sanitize_dict(item)
|
||||
elif isinstance(value, (User, AnonymousUser)):
|
||||
final_dict[key] = sanitize_dict(get_user(value))
|
||||
elif isinstance(value, models.Model):
|
||||
final_dict[key] = sanitize_dict(model_to_dict(value))
|
||||
elif isinstance(value, UUID):
|
||||
final_dict[key] = value.hex
|
||||
elif isinstance(value, (HttpRequest, WSGIRequest)):
|
||||
continue
|
||||
elif isinstance(value, City):
|
||||
final_dict[key] = GEOIP_READER.city_to_dict(value)
|
||||
elif isinstance(value, Path):
|
||||
final_dict[key] = str(value)
|
||||
elif isinstance(value, type):
|
||||
final_dict[key] = {
|
||||
"type": value.__name__,
|
||||
"module": value.__module__,
|
||||
}
|
||||
else:
|
||||
final_dict[key] = value
|
||||
new_value = sanitize_item(value)
|
||||
if new_value:
|
||||
final_dict[key] = new_value
|
||||
return final_dict
|
||||
|
|
Reference in a new issue