root: improve code style (#4436)

* cleanup pylint comments

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* remove more

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix url name

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* *: use ExtractHour instead of ExtractDay

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Jens L 2023-01-15 17:02:31 +01:00 committed by GitHub
parent 69d4719687
commit 9568f4dbd6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
65 changed files with 56 additions and 164 deletions

View file

@ -1,7 +1,7 @@
"""authentik administration metrics"""
from datetime import timedelta
from django.db.models.functions import ExtractDay
from django.db.models.functions import ExtractHour
from drf_spectacular.utils import extend_schema, extend_schema_field
from guardian.shortcuts import get_objects_for_user
from rest_framework.fields import IntegerField, SerializerMethodField
@ -37,7 +37,7 @@ class LoginMetricsSerializer(PassiveSerializer):
action=EventAction.LOGIN
)
# 3 data points per day, so 8 hour spans
.get_events_per(timedelta(days=7), ExtractDay, 7 * 3)
.get_events_per(timedelta(days=7), ExtractHour, 7 * 3)
)
@extend_schema_field(CoordinateSerializer(many=True))
@ -49,7 +49,7 @@ class LoginMetricsSerializer(PassiveSerializer):
action=EventAction.LOGIN_FAILED
)
# 3 data points per day, so 8 hour spans
.get_events_per(timedelta(days=7), ExtractDay, 7 * 3)
.get_events_per(timedelta(days=7), ExtractHour, 7 * 3)
)
@extend_schema_field(CoordinateSerializer(many=True))
@ -61,7 +61,7 @@ class LoginMetricsSerializer(PassiveSerializer):
action=EventAction.AUTHORIZE_APPLICATION
)
# 3 data points per day, so 8 hour spans
.get_events_per(timedelta(days=7), ExtractDay, 7 * 3)
.get_events_per(timedelta(days=7), ExtractHour, 7 * 3)
)

View file

@ -79,7 +79,6 @@ class TaskViewSet(ViewSet):
),
],
)
# pylint: disable=invalid-name
def retrieve(self, request: Request, pk=None) -> Response:
"""Get a single system task"""
task = TaskInfo.by_name(pk)
@ -110,7 +109,6 @@ class TaskViewSet(ViewSet):
],
)
@action(detail=True, methods=["post"])
# pylint: disable=invalid-name
def retry(self, request: Request, pk=None) -> Response:
"""Retry task"""
task = TaskInfo.by_name(pk)

View file

@ -8,7 +8,6 @@ from authentik.root.monitoring import monitoring_set
@receiver(monitoring_set)
# pylint: disable=unused-argument
def monitoring_set_workers(sender, **kwargs):
"""Set worker gauge"""
count = len(CELERY_APP.control.ping(timeout=0.5))
@ -16,7 +15,6 @@ def monitoring_set_workers(sender, **kwargs):
@receiver(monitoring_set)
# pylint: disable=unused-argument
def monitoring_set_tasks(sender, **kwargs):
"""Set task gauges"""
for task in TaskInfo.all().values():

View file

@ -66,7 +66,6 @@ class BlueprintEntry:
identifiers: dict[str, Any] = field(default_factory=dict)
attrs: Optional[dict[str, Any]] = field(default_factory=dict)
# pylint: disable=invalid-name
id: Optional[str] = None
_state: BlueprintEntryState = field(default_factory=BlueprintEntryState)
@ -192,7 +191,6 @@ class KeyOf(YAMLTag):
id_from: str
# pylint: disable=unused-argument
def __init__(self, loader: "BlueprintLoader", node: ScalarNode) -> None:
super().__init__()
self.id_from = node.value
@ -219,7 +217,6 @@ class Env(YAMLTag):
key: str
default: Optional[Any]
# pylint: disable=unused-argument
def __init__(self, loader: "BlueprintLoader", node: ScalarNode | SequenceNode) -> None:
super().__init__()
self.default = None
@ -239,7 +236,6 @@ class Context(YAMLTag):
key: str
default: Optional[Any]
# pylint: disable=unused-argument
def __init__(self, loader: "BlueprintLoader", node: ScalarNode | SequenceNode) -> None:
super().__init__()
self.default = None
@ -262,7 +258,6 @@ class Format(YAMLTag):
format_string: str
args: list[Any]
# pylint: disable=unused-argument
def __init__(self, loader: "BlueprintLoader", node: SequenceNode) -> None:
super().__init__()
self.format_string = node.value[0].value
@ -342,7 +337,6 @@ class Condition(YAMLTag):
"XNOR": lambda args: not (reduce(ixor, args) if len(args) > 1 else args[0]),
}
# pylint: disable=unused-argument
def __init__(self, loader: "BlueprintLoader", node: SequenceNode) -> None:
super().__init__()
self.mode = node.value[0].value
@ -375,7 +369,6 @@ class If(YAMLTag):
when_true: Any
when_false: Any
# pylint: disable=unused-argument
def __init__(self, loader: "BlueprintLoader", node: SequenceNode) -> None:
super().__init__()
self.condition = loader.construct_object(node.value[0])
@ -414,7 +407,6 @@ class Enumerate(YAMLTag, YAMLTagContext):
),
}
# pylint: disable=unused-argument
def __init__(self, loader: "BlueprintLoader", node: SequenceNode) -> None:
super().__init__()
self.iterable = loader.construct_object(node.value[0])
@ -422,7 +414,6 @@ class Enumerate(YAMLTag, YAMLTagContext):
self.item_body = loader.construct_object(node.value[2])
self.__current_context: tuple[Any, Any] = tuple()
# pylint: disable=unused-argument
def get_context(self, entry: BlueprintEntry, blueprint: Blueprint) -> Any:
return self.__current_context
@ -480,7 +471,6 @@ class EnumeratedItem(YAMLTag):
_SUPPORTED_CONTEXT_TAGS = (Enumerate,)
# pylint: disable=unused-argument
def __init__(self, loader: "BlueprintLoader", node: ScalarNode) -> None:
super().__init__()
self.depth = int(node.value)

View file

@ -4,7 +4,7 @@ from typing import Optional
from django.core.cache import cache
from django.db.models import QuerySet
from django.db.models.functions import ExtractDay
from django.db.models.functions import ExtractHour
from django.http.response import HttpResponseBadRequest
from django.shortcuts import get_object_or_404
from drf_spectacular.types import OpenApiTypes
@ -227,7 +227,6 @@ class ApplicationViewSet(UsedByMixin, ModelViewSet):
methods=["POST"],
parser_classes=(MultiPartParser,),
)
# pylint: disable=unused-argument
def set_icon(self, request: Request, slug: str):
"""Set application icon"""
app: Application = self.get_object()
@ -247,7 +246,6 @@ class ApplicationViewSet(UsedByMixin, ModelViewSet):
filter_backends=[],
methods=["POST"],
)
# pylint: disable=unused-argument
def set_icon_url(self, request: Request, slug: str):
"""Set application icon (as URL)"""
app: Application = self.get_object()
@ -256,7 +254,6 @@ class ApplicationViewSet(UsedByMixin, ModelViewSet):
@permission_required("authentik_core.view_application", ["authentik_events.view_event"])
@extend_schema(responses={200: CoordinateSerializer(many=True)})
@action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument
def metrics(self, request: Request, slug: str):
"""Metrics for application logins"""
app = self.get_object()
@ -266,5 +263,5 @@ class ApplicationViewSet(UsedByMixin, ModelViewSet):
context__authorized_application__pk=app.pk.hex,
)
# 3 data points per day, so 8 hour spans
.get_events_per(timedelta(days=7), ExtractDay, 7 * 3)
.get_events_per(timedelta(days=7), ExtractHour, 7 * 3)
)

View file

@ -96,7 +96,6 @@ class GroupFilter(FilterSet):
queryset=User.objects.all(),
)
# pylint: disable=unused-argument
def filter_attributes(self, queryset, name, value):
"""Filter attributes by query args"""
try:
@ -157,7 +156,6 @@ class GroupViewSet(UsedByMixin, ModelViewSet):
},
)
@action(detail=True, methods=["POST"], pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument, invalid-name
def add_user(self, request: Request, pk: str) -> Response:
"""Add user to group"""
group: Group = self.get_object()
@ -182,7 +180,6 @@ class GroupViewSet(UsedByMixin, ModelViewSet):
},
)
@action(detail=True, methods=["POST"], pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument, invalid-name
def remove_user(self, request: Request, pk: str) -> Response:
"""Add user to group"""
group: Group = self.get_object()

View file

@ -117,7 +117,6 @@ class PropertyMappingViewSet(
],
)
@action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
# pylint: disable=unused-argument, invalid-name
def test(self, request: Request, pk: str) -> Response:
"""Test Property Mapping"""
mapping: PropertyMapping = self.get_object()

View file

@ -102,7 +102,6 @@ class SourceViewSet(
methods=["POST"],
parser_classes=(MultiPartParser,),
)
# pylint: disable=unused-argument
def set_icon(self, request: Request, slug: str):
"""Set source icon"""
source: Source = self.get_object()
@ -122,7 +121,6 @@ class SourceViewSet(
filter_backends=[],
methods=["POST"],
)
# pylint: disable=unused-argument
def set_icon_url(self, request: Request, slug: str):
"""Set source icon (as URL)"""
source: Source = self.get_object()

View file

@ -112,7 +112,6 @@ class TokenViewSet(UsedByMixin, ModelViewSet):
}
)
@action(detail=True, pagination_class=None, filter_backends=[], methods=["GET"])
# pylint: disable=unused-argument
def view_key(self, request: Request, identifier: str) -> Response:
"""Return token key and log access"""
token: Token = self.get_object()
@ -134,7 +133,6 @@ class TokenViewSet(UsedByMixin, ModelViewSet):
},
)
@action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
# pylint: disable=unused-argument
def set_key(self, request: Request, identifier: str) -> Response:
"""Return token key and log access"""
token: Token = self.get_object()

View file

@ -53,7 +53,7 @@ class UsedByMixin:
responses={200: UsedBySerializer(many=True)},
)
@action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=invalid-name, unused-argument, too-many-locals
# pylint: disable=too-many-locals
def used_by(self, request: Request, *args, **kwargs) -> Response:
"""Get a list of all objects that use this object"""
# pyright: reportGeneralTypeIssues=false

View file

@ -4,7 +4,7 @@ from json import loads
from typing import Any, Optional
from django.contrib.auth import update_session_auth_hash
from django.db.models.functions import ExtractDay
from django.db.models.functions import ExtractHour
from django.db.models.query import QuerySet
from django.db.transaction import atomic
from django.db.utils import IntegrityError
@ -213,7 +213,7 @@ class UserMetricsSerializer(PassiveSerializer):
action=EventAction.LOGIN, user__pk=user.pk
)
# 3 data points per day, so 8 hour spans
.get_events_per(timedelta(days=7), ExtractDay, 7 * 3)
.get_events_per(timedelta(days=7), ExtractHour, 7 * 3)
)
@extend_schema_field(CoordinateSerializer(many=True))
@ -225,7 +225,7 @@ class UserMetricsSerializer(PassiveSerializer):
action=EventAction.LOGIN_FAILED, context__username=user.username
)
# 3 data points per day, so 8 hour spans
.get_events_per(timedelta(days=7), ExtractDay, 7 * 3)
.get_events_per(timedelta(days=7), ExtractHour, 7 * 3)
)
@extend_schema_field(CoordinateSerializer(many=True))
@ -237,7 +237,7 @@ class UserMetricsSerializer(PassiveSerializer):
action=EventAction.AUTHORIZE_APPLICATION, user__pk=user.pk
)
# 3 data points per day, so 8 hour spans
.get_events_per(timedelta(days=7), ExtractDay, 7 * 3)
.get_events_per(timedelta(days=7), ExtractHour, 7 * 3)
)
@ -269,7 +269,6 @@ class UsersFilter(FilterSet):
queryset=Group.objects.all(),
)
# pylint: disable=unused-argument
def filter_attributes(self, queryset, name, value):
"""Filter attributes by query args"""
try:
@ -404,9 +403,8 @@ class UserViewSet(UsedByMixin, ModelViewSet):
return Response(data={"non_field_errors": [str(exc)]}, status=400)
@extend_schema(responses={200: SessionUserSerializer(many=False)})
@action(detail=False, pagination_class=None, filter_backends=[])
# pylint: disable=invalid-name
def me(self, request: Request) -> Response:
@action(url_path="me", url_name="me", detail=False, pagination_class=None, filter_backends=[])
def user_me(self, request: Request) -> Response:
"""Get information about current user"""
context = {"request": request}
serializer = SessionUserSerializer(
@ -434,7 +432,6 @@ class UserViewSet(UsedByMixin, ModelViewSet):
},
)
@action(detail=True, methods=["POST"])
# pylint: disable=invalid-name, unused-argument
def set_password(self, request: Request, pk: int) -> Response:
"""Set password for user"""
user: User = self.get_object()
@ -452,7 +449,6 @@ class UserViewSet(UsedByMixin, ModelViewSet):
@permission_required("authentik_core.view_user", ["authentik_events.view_event"])
@extend_schema(responses={200: UserMetricsSerializer(many=False)})
@action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=invalid-name, unused-argument
def metrics(self, request: Request, pk: int) -> Response:
"""User metrics per 1h"""
user: User = self.get_object()
@ -468,7 +464,6 @@ class UserViewSet(UsedByMixin, ModelViewSet):
},
)
@action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=invalid-name, unused-argument
def recovery(self, request: Request, pk: int) -> Response:
"""Create a temporary link that a user can use to recover their accounts"""
link, _ = self._create_recovery_link()
@ -493,7 +488,6 @@ class UserViewSet(UsedByMixin, ModelViewSet):
},
)
@action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=invalid-name, unused-argument
def recovery_email(self, request: Request, pk: int) -> Response:
"""Create a temporary link that a user can use to recover their accounts"""
for_user: User = self.get_object()

View file

@ -49,7 +49,6 @@ class Command(BaseCommand):
return namespace
@staticmethod
# pylint: disable=unused-argument
def post_save_handler(sender, instance: Model, created: bool, **_):
"""Signal handler for all object's post_save"""
if not should_log_model(instance):
@ -65,7 +64,6 @@ class Command(BaseCommand):
).save()
@staticmethod
# pylint: disable=unused-argument
def pre_delete_handler(sender, instance: Model, **_):
"""Signal handler for all object's pre_delete"""
if not should_log_model(instance): # pragma: no cover

View file

@ -20,7 +20,6 @@ if TYPE_CHECKING:
@receiver(post_save)
# pylint: disable=unused-argument
def post_save_application(sender: type[Model], instance, created: bool, **_):
"""Clear user's application cache upon application creation"""
from authentik.core.api.applications import user_app_cache_key
@ -36,7 +35,6 @@ def post_save_application(sender: type[Model], instance, created: bool, **_):
@receiver(user_logged_in)
# pylint: disable=unused-argument
def user_logged_in_session(sender, request: HttpRequest, user: "User", **_):
"""Create an AuthenticatedSession from request"""
from authentik.core.models import AuthenticatedSession
@ -47,7 +45,6 @@ def user_logged_in_session(sender, request: HttpRequest, user: "User", **_):
@receiver(user_logged_out)
# pylint: disable=unused-argument
def user_logged_out_session(sender, request: HttpRequest, user: "User", **_):
"""Delete AuthenticatedSession if it exists"""
from authentik.core.models import AuthenticatedSession

View file

@ -48,7 +48,6 @@ class Action(Enum):
class MessageStage(StageView):
"""Show a pre-configured message after the flow is done"""
# pylint: disable=unused-argument
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""Show a pre-configured message after the flow is done"""
message = getattr(self.executor.current_stage, "message", "")
@ -209,7 +208,6 @@ class SourceFlowManager:
response.error_message = error.messages
return response
# pylint: disable=unused-argument
def get_stages_to_append(self, flow: Flow) -> list[Stage]:
"""Hook to override stages which are appended to the flow"""
if not self.source.enrollment_flow:
@ -264,7 +262,6 @@ class SourceFlowManager:
flow_slug=flow.slug,
)
# pylint: disable=unused-argument
def handle_auth(
self,
connection: UserSourceConnection,

View file

@ -13,7 +13,6 @@ class PostUserEnrollmentStage(StageView):
"""Dynamically injected stage which saves the Connection after
the user has been enrolled."""
# pylint: disable=unused-argument
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""Stage used after the user has been enrolled"""
connection: UserSourceConnection = self.executor.plan.context[

View file

@ -187,7 +187,6 @@ class CertificateKeyPairFilter(FilterSet):
label="Only return certificate-key pairs with keys", method="filter_has_key"
)
# pylint: disable=unused-argument
def filter_has_key(self, queryset, name, value): # pragma: no cover
"""Only return certificate-key pairs with keys"""
return queryset.exclude(key_data__exact="")
@ -256,7 +255,6 @@ class CertificateKeyPairViewSet(UsedByMixin, ModelViewSet):
responses={200: CertificateDataSerializer(many=False)},
)
@action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=invalid-name, unused-argument
def view_certificate(self, request: Request, pk: str) -> Response:
"""Return certificate-key pairs certificate and log access"""
certificate: CertificateKeyPair = self.get_object()
@ -287,7 +285,6 @@ class CertificateKeyPairViewSet(UsedByMixin, ModelViewSet):
responses={200: CertificateDataSerializer(many=False)},
)
@action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=invalid-name, unused-argument
def view_private_key(self, request: Request, pk: str) -> Response:
"""Return certificate-key pairs private key and log access"""
certificate: CertificateKeyPair = self.get_object()

View file

@ -83,7 +83,6 @@ class EventsFilter(django_filters.FilterSet):
label="Tenant name",
)
# pylint: disable=unused-argument
def filter_context_model_pk(self, queryset, name, value):
"""Because we store the PK as UUID.hex,
we need to remove the dashes that a client may send. We can't use a

View file

@ -80,7 +80,6 @@ class NotificationTransportViewSet(UsedByMixin, ModelViewSet):
request=OpenApiTypes.NONE,
)
@action(detail=True, pagination_class=None, filter_backends=[], methods=["post"])
# pylint: disable=invalid-name, unused-argument
def test(self, request: Request, pk=None) -> Response:
"""Send example notification using selected transport. Requires
Modify permissions."""

View file

@ -101,7 +101,6 @@ class AuditMiddleware:
self.disconnect(request)
return response
# pylint: disable=unused-argument
def process_exception(self, request: HttpRequest, exception: Exception):
"""Disconnect handlers in case of exception"""
self.disconnect(request)
@ -125,7 +124,6 @@ class AuditMiddleware:
thread.run()
@staticmethod
# pylint: disable=unused-argument
def post_save_handler(
user: User, request: HttpRequest, sender, instance: Model, created: bool, **_
):
@ -137,7 +135,6 @@ class AuditMiddleware:
EventNewThread(action, request, user=user, model=model_to_dict(instance)).run()
@staticmethod
# pylint: disable=unused-argument
def pre_delete_handler(user: User, request: HttpRequest, sender, instance: Model, **_):
"""Signal handler for all object's pre_delete"""
if not should_log_model(instance): # pragma: no cover

View file

@ -133,12 +133,11 @@ class EventQuerySet(QuerySet):
)
data = Counter({int(d["age_interval"]): d["count"] for d in result})
results = []
interval_timdelta = time_since / data_points
interval_delta = time_since / data_points
for interval in range(1, -data_points, -1):
results.append(
{
"x_cord": time.mktime((_now + (interval_timdelta * interval)).timetuple())
* 1000,
"x_cord": time.mktime((_now + (interval_delta * interval)).timetuple()) * 1000,
"y_cord": data[interval * -1],
}
)

View file

@ -22,7 +22,6 @@ SESSION_LOGIN_EVENT = "login_event"
@receiver(user_logged_in)
# pylint: disable=unused-argument
def on_user_logged_in(sender, request: HttpRequest, user: User, **_):
"""Log successful login"""
kwargs = {}
@ -45,14 +44,12 @@ def get_login_event(request: HttpRequest) -> Optional[Event]:
@receiver(user_logged_out)
# pylint: disable=unused-argument
def on_user_logged_out(sender, request: HttpRequest, user: User, **_):
"""Log successfully logout"""
Event.new(EventAction.LOGOUT).from_http(request, user=user)
@receiver(user_write)
# pylint: disable=unused-argument
def on_user_write(sender, request: HttpRequest, user: User, data: dict[str, Any], **kwargs):
"""Log User write"""
data["created"] = kwargs.get("created", False)
@ -60,7 +57,6 @@ def on_user_write(sender, request: HttpRequest, user: User, data: dict[str, Any]
@receiver(login_failed)
# pylint: disable=unused-argument
def on_login_failed(
signal,
sender,
@ -74,7 +70,6 @@ def on_login_failed(
@receiver(invitation_used)
# pylint: disable=unused-argument
def on_invitation_used(sender, request: HttpRequest, invitation: Invitation, **_):
"""Log Invitation usage"""
Event.new(EventAction.INVITE_USED, invitation_uuid=invitation.invite_uuid.hex).from_http(
@ -83,21 +78,18 @@ def on_invitation_used(sender, request: HttpRequest, invitation: Invitation, **_
@receiver(password_changed)
# pylint: disable=unused-argument
def on_password_changed(sender, user: User, password: str, **_):
"""Log password change"""
Event.new(EventAction.PASSWORD_SET).from_http(None, user=user)
@receiver(post_save, sender=Event)
# pylint: disable=unused-argument
def event_post_save_notification(sender, instance: Event, **_):
"""Start task to check if any policies trigger an notification on this event"""
event_notification_handler.delay(instance.event_uuid.hex)
@receiver(pre_delete, sender=User)
# pylint: disable=unused-argument
def event_user_pre_delete_cleanup(sender, instance: User, **_):
"""If gdpr_compliance is enabled, remove all the user's events"""
gdpr_cleanup.delay(instance.pk)

View file

@ -210,7 +210,6 @@ class FlowViewSet(UsedByMixin, ModelViewSet):
},
)
@action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument
def export(self, request: Request, slug: str) -> Response:
"""Export flow to .yaml file"""
flow = self.get_object()
@ -221,7 +220,6 @@ class FlowViewSet(UsedByMixin, ModelViewSet):
@extend_schema(responses={200: FlowDiagramSerializer()})
@action(detail=True, pagination_class=None, filter_backends=[], methods=["get"])
# pylint: disable=unused-argument
def diagram(self, request: Request, slug: str) -> Response:
"""Return diagram for flow with slug `slug`, in the format used by flowchart.js"""
diagram = FlowDiagram(self.get_object(), request.user)
@ -245,7 +243,6 @@ class FlowViewSet(UsedByMixin, ModelViewSet):
methods=["POST"],
parser_classes=(MultiPartParser,),
)
# pylint: disable=unused-argument
def set_background(self, request: Request, slug: str):
"""Set Flow background"""
flow: Flow = self.get_object()
@ -265,7 +262,6 @@ class FlowViewSet(UsedByMixin, ModelViewSet):
filter_backends=[],
methods=["POST"],
)
# pylint: disable=unused-argument
def set_background_url(self, request: Request, slug: str):
"""Set Flow background (as URL)"""
flow: Flow = self.get_object()
@ -278,7 +274,6 @@ class FlowViewSet(UsedByMixin, ModelViewSet):
},
)
@action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument
def execute(self, request: Request, slug: str):
"""Execute flow for current user"""
# Because we pre-plan the flow here, and not in the planner, we need to manually clear

View file

@ -19,7 +19,6 @@ LOGGER = get_logger()
class StageMarker:
"""Base stage marker class, no extra attributes, and has no special handler."""
# pylint: disable=unused-argument
def process(
self,
plan: "FlowPlan",

View file

@ -19,7 +19,6 @@ def delete_cache_prefix(prefix: str) -> int:
@receiver(monitoring_set)
# pylint: disable=unused-argument
def monitoring_set_flows(sender, **kwargs):
"""set flow gauges"""
GAUGE_FLOWS_CACHED.set(len(cache.keys(f"{CACHE_PREFIX}*") or []))
@ -27,7 +26,6 @@ def monitoring_set_flows(sender, **kwargs):
@receiver(post_save)
@receiver(pre_delete)
# pylint: disable=unused-argument
def invalidate_flow_cache(sender, instance, **_):
"""Invalidate flow cache when flow is updated"""
from authentik.flows.models import Flow, FlowStageBinding, Stage

View file

@ -91,7 +91,6 @@ class ChallengeStageView(StageView):
)
return HttpChallengeResponse(challenge)
# pylint: disable=unused-argument
def post(self, request: Request, *args, **kwargs) -> HttpResponse:
"""Handle challenge response"""
challenge: ChallengeResponse = self.get_response_instance(data=request.data)

View file

@ -166,7 +166,7 @@ class FlowExecutorView(APIView):
self._logger.debug("f(exec): restored flow plan from token", plan=plan)
return plan
# pylint: disable=unused-argument, too-many-return-statements
# pylint: disable=too-many-return-statements
def dispatch(self, request: HttpRequest, flow_slug: str) -> HttpResponse:
with Hub.current.start_span(
op="authentik.flow.executor.dispatch", description=self.flow.slug

View file

@ -47,7 +47,6 @@ class FlowInspectorPlanSerializer(PassiveSerializer):
"""Get the plan's context, sanitized"""
return sanitize_dict(plan.context)
# pylint: disable=unused-argument
def get_session_id(self, plan: FlowPlan) -> str:
"""Get a unique session ID"""
request: Request = self.context["request"]

View file

@ -159,7 +159,6 @@ class BaseEvaluator:
raise exc
return result
# pylint: disable=unused-argument
def handle_error(self, exc: Exception, expression_source: str): # pragma: no cover
"""Exception Handler"""
LOGGER.warning("Expression error", exc=exc)

View file

@ -3,7 +3,6 @@ from logging import Logger
from os import getpid
# pylint: disable=unused-argument
def add_process_id(logger: Logger, method_name: str, event_dict):
"""Add the current process ID"""
event_dict["pid"] = getpid()

View file

@ -148,7 +148,6 @@ class OutpostViewSet(UsedByMixin, ModelViewSet):
@extend_schema(responses={200: OutpostHealthSerializer(many=True)})
@action(methods=["GET"], detail=True, pagination_class=None)
# pylint: disable=invalid-name, unused-argument
def health(self, request: Request, pk: int) -> Response:
"""Get outposts current health"""
outpost: Outpost = self.get_object()

View file

@ -91,7 +91,6 @@ class ServiceConnectionViewSet(
@extend_schema(responses={200: ServiceConnectionStateSerializer(many=False)})
@action(detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument, invalid-name
def state(self, request: Request, pk: str) -> Response:
"""Get the service connection's state"""
connection = self.get_object()

View file

@ -69,7 +69,6 @@ class OutpostConsumer(AuthJsonConsumer):
self.outpost = outpost
self.last_uid = self.channel_name
# pylint: disable=unused-argument
def disconnect(self, code):
if self.outpost and self.last_uid:
state = OutpostState.for_instance_uid(self.outpost, self.last_uid)
@ -127,7 +126,6 @@ class OutpostConsumer(AuthJsonConsumer):
response = WebsocketMessage(instruction=WebsocketMessageInstruction.ACK)
self.send_json(asdict(response))
# pylint: disable=unused-argument
def event_update(self, event): # pragma: no cover
"""Event handler which is called by post_save signals, Send update instruction"""
self.send_json(

View file

@ -23,7 +23,6 @@ UPDATE_TRIGGERING_MODELS = (
@receiver(pre_save, sender=Outpost)
# pylint: disable=unused-argument
def pre_save_outpost(sender, instance: Outpost, **_):
"""Pre-save checks for an outpost, if the name or config.kubernetes_namespace changes,
we call down and then wait for the up after save"""
@ -43,7 +42,6 @@ def pre_save_outpost(sender, instance: Outpost, **_):
@receiver(m2m_changed, sender=Outpost.providers.through)
# pylint: disable=unused-argument
def m2m_changed_update(sender, instance: Model, action: str, **_):
"""Update outpost on m2m change, when providers are added or removed"""
if action in ["post_add", "post_remove", "post_clear"]:
@ -51,7 +49,6 @@ def m2m_changed_update(sender, instance: Model, action: str, **_):
@receiver(post_save)
# pylint: disable=unused-argument
def post_save_update(sender, instance: Model, created: bool, **_):
"""If an Outpost is saved, Ensure that token is created/updated
@ -70,7 +67,6 @@ def post_save_update(sender, instance: Model, created: bool, **_):
@receiver(pre_delete, sender=Outpost)
# pylint: disable=unused-argument
def pre_delete_cleanup(sender, instance: Outpost, **_):
"""Ensure that Outpost's user is deleted (which will delete the token through cascade)"""
instance.user.delete()

View file

@ -144,7 +144,6 @@ class PolicyViewSet(
},
)
@action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
# pylint: disable=unused-argument, invalid-name
def test(self, request: Request, pk: str) -> Response:
"""Test policy"""
policy = self.get_object()

View file

@ -37,7 +37,6 @@ def update_score(request: HttpRequest, identifier: str, amount: int):
@receiver(login_failed)
# pylint: disable=unused-argument
def handle_failed_login(sender, request, credentials, **_):
"""Lower Score for failed login attempts"""
if "username" in credentials:
@ -45,14 +44,12 @@ def handle_failed_login(sender, request, credentials, **_):
@receiver(identification_failed)
# pylint: disable=unused-argument
def handle_identification_failed(sender, request, uid_field: str, **_):
"""Lower Score for failed identification attempts"""
update_score(request, uid_field, -1)
@receiver(user_logged_in)
# pylint: disable=unused-argument
def handle_successful_login(sender, request, user, **_):
"""Raise score for successful attempts"""
update_score(request, user.username, 1)

View file

@ -13,14 +13,12 @@ LOGGER = get_logger()
@receiver(monitoring_set)
# pylint: disable=unused-argument
def monitoring_set_policies(sender, **kwargs):
"""set policy gauges"""
GAUGE_POLICIES_CACHED.set(len(cache.keys(f"{CACHE_PREFIX}_*") or []))
@receiver(post_save)
# pylint: disable=unused-argument
def invalidate_policy_cache(sender, instance, **_):
"""Invalidate Policy cache when policy is updated"""
from authentik.policies.models import Policy, PolicyBinding

View file

@ -83,7 +83,6 @@ class OAuth2ProviderViewSet(UsedByMixin, ModelViewSet):
}
)
@action(methods=["GET"], detail=True)
# pylint: disable=invalid-name
def setup_urls(self, request: Request, pk: int) -> str:
"""Get Providers setup URLs"""
provider = get_object_or_404(OAuth2Provider, pk=pk)
@ -140,7 +139,6 @@ class OAuth2ProviderViewSet(UsedByMixin, ModelViewSet):
},
)
@action(detail=True, methods=["GET"])
# pylint: disable=invalid-name, unused-argument
def preview_user(self, request: Request, pk: int) -> Response:
"""Preview user data for provider"""
provider: OAuth2Provider = self.get_object()

View file

@ -318,7 +318,6 @@ class AuthorizationFlowInitView(PolicyAccessView):
request.context["oauth_response_type"] = self.params.response_type
return request
# pylint: disable=unused-argument
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""Start FlowPLanner, return to flow executor shell"""
# After we've checked permissions, and the user has access, check if we need
@ -429,7 +428,6 @@ class OAuthFulfillmentStage(StageView):
"""Wrapper when this stage gets hit with a post request"""
return self.get(request, *args, **kwargs)
# pylint: disable=unused-argument
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""final Stage of an OAuth2 Flow"""
if PLAN_CONTEXT_PARAMS not in self.executor.plan.context:

View file

@ -73,7 +73,6 @@ class GitHubUserView(View):
class GitHubUserTeamsView(View):
"""Emulate GitHub's /user/teams API Endpoint"""
# pylint: disable=unused-argument
def get(self, request: HttpRequest, token: RefreshToken) -> HttpResponse:
"""Emulate GitHub's /user/teams API Endpoint"""
user = token.user

View file

@ -144,7 +144,6 @@ class ProviderInfoView(View):
default_claims.extend(value.keys())
return default_claims
# pylint: disable=unused-argument
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""OpenID-compliant Provider Info"""
return JsonResponse(self.get_info(self.provider), json_dumps_params={"indent": 2})

View file

@ -209,7 +209,6 @@ class SAMLProviderViewSet(UsedByMixin, ModelViewSet):
],
)
@action(methods=["GET"], detail=True, permission_classes=[AllowAny])
# pylint: disable=invalid-name, unused-argument
def metadata(self, request: Request, pk: int) -> Response:
"""Return metadata as XML string"""
# We don't use self.get_object() on purpose as this view is un-authenticated
@ -282,7 +281,6 @@ class SAMLProviderViewSet(UsedByMixin, ModelViewSet):
},
)
@action(detail=True, methods=["GET"])
# pylint: disable=invalid-name, unused-argument
def preview_user(self, request: Request, pk: int) -> Response:
"""Preview user data for provider"""
provider: SAMLProvider = self.get_object()

View file

@ -35,7 +35,6 @@ ERROR_FAILED_TO_VERIFY = "Failed to verify signature"
class AuthNRequest:
"""AuthNRequest Dataclass"""
# pylint: disable=invalid-name
id: Optional[str] = None
relay_state: Optional[str] = None

View file

@ -16,7 +16,6 @@ from authentik.sources.saml.processors.constants import NS_SAML_PROTOCOL
class LogoutRequest:
"""Logout Request"""
# pylint: disable=invalid-name
id: Optional[str] = None
issuer: Optional[str] = None

View file

@ -39,7 +39,6 @@ class SAMLSLOView(PolicyAccessView):
"""Handler to verify the SAML Request. Must be implemented by a subclass"""
raise NotImplementedError
# pylint: disable=unused-argument
def get(self, request: HttpRequest, application_slug: str) -> HttpResponse:
"""Verify the SAML Request, and if valid initiate the FlowPlanner for the application"""
# Call the method handler, which checks the SAML

View file

@ -50,7 +50,6 @@ class SAMLSSOView(PolicyAccessView):
"""Handler to verify the SAML Request. Must be implemented by a subclass"""
raise NotImplementedError
# pylint: disable=unused-argument
def get(self, request: HttpRequest, application_slug: str) -> HttpResponse:
"""Verify the SAML Request, and if valid initiate the FlowPlanner for the application"""
# Call the method handler, which checks the SAML

View file

@ -30,14 +30,12 @@ CELERY_APP = Celery("authentik")
CTX_TASK_ID = ContextVar(STRUCTLOG_KEY_PREFIX + "task_id", default=Ellipsis)
# pylint: disable=unused-argument
@setup_logging.connect
def config_loggers(*args, **kwargs):
"""Apply logging settings from settings.py to celery"""
dictConfig(settings.LOGGING)
# pylint: disable=unused-argument
@after_task_publish.connect
def after_task_publish_hook(sender=None, headers=None, body=None, **kwargs):
"""Log task_id after it was published"""
@ -45,7 +43,6 @@ def after_task_publish_hook(sender=None, headers=None, body=None, **kwargs):
LOGGER.info("Task published", task_id=info.get("id", ""), task_name=info.get("task", ""))
# pylint: disable=unused-argument
@task_prerun.connect
def task_prerun_hook(task_id: str, task, *args, **kwargs):
"""Log task_id on worker"""
@ -54,7 +51,6 @@ def task_prerun_hook(task_id: str, task, *args, **kwargs):
LOGGER.info("Task started", task_id=task_id, task_name=task.__name__)
# pylint: disable=unused-argument
@task_postrun.connect
def task_postrun_hook(task_id, task, *args, retval=None, state=None, **kwargs):
"""Log task_id on worker"""
@ -62,7 +58,6 @@ def task_postrun_hook(task_id, task, *args, retval=None, state=None, **kwargs):
LOGGER.info("Task finished", task_id=task_id, task_name=task.__name__, state=state)
# pylint: disable=unused-argument
@task_failure.connect
@task_internal_error.connect
def task_error_hook(task_id, exception: Exception, traceback, *args, **kwargs):

View file

@ -18,7 +18,6 @@ class MessageConsumer(JsonWebsocketConsumer):
return
cache.set(f"{CACHE_PREFIX}{self.session_key}_messages_{self.channel_name}", True, None)
# pylint: disable=unused-argument
def disconnect(self, code):
cache.delete(f"{CACHE_PREFIX}{self.session_key}_messages_{self.channel_name}")

View file

@ -100,7 +100,6 @@ class LDAPSourceViewSet(UsedByMixin, ModelViewSet):
}
)
@action(methods=["GET"], detail=True, pagination_class=None, filter_backends=[])
# pylint: disable=unused-argument
def sync_status(self, request: Request, slug: str) -> Response:
"""Get source's sync status"""
source = self.get_object()

View file

@ -22,7 +22,6 @@ from authentik.stages.prompt.signals import password_validate
@receiver(post_save, sender=LDAPSource)
# pylint: disable=unused-argument
def sync_ldap_source_on_save(sender, instance: LDAPSource, **_):
"""Ensure that source is synced on save (if enabled)"""
if not instance.enabled:
@ -42,7 +41,6 @@ def sync_ldap_source_on_save(sender, instance: LDAPSource, **_):
@receiver(password_validate)
# pylint: disable=unused-argument
def ldap_password_validate(sender, password: str, plan_context: dict[str, Any], **__):
"""if there's an LDAP Source with enabled password sync, check the password"""
sources = LDAPSource.objects.filter(sync_users_password=True)
@ -59,7 +57,6 @@ def ldap_password_validate(sender, password: str, plan_context: dict[str, Any],
@receiver(password_changed)
# pylint: disable=unused-argument
def ldap_sync_password(sender, user: User, password: str, **_):
"""Connect to ldap and update password."""
sources = LDAPSource.objects.filter(sync_users_password=True)

View file

@ -118,7 +118,6 @@ class OAuthSourceFilter(FilterSet):
has_jwks = BooleanFilter(label="Only return sources with JWKS data", method="filter_has_jwks")
# pylint: disable=unused-argument
def filter_has_jwks(self, queryset, name, value): # pragma: no cover
"""Only return sources with JWKS data"""
return queryset.exclude(oidc_jwks__iexact="{}")

View file

@ -41,7 +41,6 @@ class SourceType:
"""Get Icon URL for login"""
return static(f"authentik/sources/{self.slug}.svg")
# pylint: disable=unused-argument
def login_challenge(self, source: OAuthSource, request: HttpRequest) -> Challenge:
"""Allow types to return custom challenges"""
return RedirectChallenge(

View file

@ -70,12 +70,10 @@ class OAuthCallback(OAuthClientMixin, View):
access_token=self.token.get("access_token"),
)
# pylint: disable=unused-argument
def get_callback_url(self, source: OAuthSource) -> str:
"Return callback url if different than the current url."
return ""
# pylint: disable=unused-argument
def get_error_redirect(self, source: OAuthSource, reason: str) -> str:
"Return url to redirect on login failure."
return settings.LOGIN_URL
@ -87,7 +85,6 @@ class OAuthCallback(OAuthClientMixin, View):
"""Create a dict of User data"""
raise NotImplementedError()
# pylint: disable=unused-argument
def get_user_id(self, info: dict[str, Any]) -> Optional[str]:
"""Return unique identifier from the profile info."""
if "id" in info:

View file

@ -18,7 +18,6 @@ class OAuthRedirect(OAuthClientMixin, RedirectView):
permanent = False
params = None
# pylint: disable=unused-argument
def get_additional_parameters(self, source: OAuthSource) -> dict[str, Any]:
"Return additional redirect parameters for this source."
return self.params or {}

View file

@ -66,7 +66,6 @@ class SAMLSourceViewSet(UsedByMixin, ModelViewSet):
@extend_schema(responses={200: SAMLMetadataSerializer(many=False)})
@action(methods=["GET"], detail=True)
# pylint: disable=unused-argument
def metadata(self, request: Request, slug: str) -> Response:
"""Return metadata as XML string"""
source = self.get_object()

View file

@ -10,7 +10,6 @@ LOGGER = get_logger()
@receiver(user_logged_out)
# pylint: disable=unused-argument
def on_user_logged_out(sender, request: HttpRequest, user: User, **_):
"""Delete temporary user if the `delete_on_logout` flag is enabled"""
if not user:

View file

@ -78,7 +78,6 @@ class AuthenticatorDuoStageViewSet(UsedByMixin, ModelViewSet):
},
)
@action(methods=["POST"], detail=True, permission_classes=[])
# pylint: disable=invalid-name,unused-argument
def enrollment_status(self, request: Request, pk: str) -> Response:
"""Check enrollment status of user details in current session"""
stage: AuthenticatorDuoStage = AuthenticatorDuoStage.objects.filter(pk=pk).first()
@ -108,7 +107,6 @@ class AuthenticatorDuoStageViewSet(UsedByMixin, ModelViewSet):
},
)
@action(methods=["POST"], detail=True)
# pylint: disable=invalid-name,unused-argument
def import_device_manual(self, request: Request, pk: str) -> Response:
"""Import duo devices into authentik"""
stage: AuthenticatorDuoStage = self.get_object()
@ -150,7 +148,6 @@ class AuthenticatorDuoStageViewSet(UsedByMixin, ModelViewSet):
},
)
@action(methods=["POST"], detail=True)
# pylint: disable=invalid-name,unused-argument
def import_devices_automatic(self, request: Request, pk: str) -> Response:
"""Import duo devices into authentik"""
stage: AuthenticatorDuoStage = self.get_object()

View file

@ -7,7 +7,6 @@ from authentik.events.models import Event
@receiver(pre_delete, sender=StaticDevice)
# pylint: disable=unused-argument
def pre_delete_event(sender, instance: StaticDevice, **_):
"""Create event before deleting Static Devices"""
# Create event with email notification

View file

@ -125,7 +125,6 @@ def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Dev
return device
# pylint: disable=unused-argument
def validate_challenge_webauthn(data: dict, stage_view: StageView, user: User) -> Device:
"""Validate WebAuthn Challenge"""
request = stage_view.request

View file

@ -374,7 +374,6 @@ class AuthenticatorValidateStageView(ChallengeStageView):
)
return response
# pylint: disable=unused-argument
def challenge_valid(self, response: AuthenticatorValidationChallengeResponse) -> HttpResponse:
# All validation is done by the serializer
user = self.executor.plan.context.get(PLAN_CONTEXT_PENDING_USER)

View file

@ -134,7 +134,6 @@ class PromptChallengeResponse(ChallengeResponse):
def username_field_validator_factory() -> Callable[[PromptChallenge, str], Any]:
"""Return a `clean_` method for `field`. Clean method checks if username is taken already."""
# pylint: disable=unused-argument
def username_field_validator(self: PromptChallenge, value: str) -> Any:
"""Check for duplicate usernames"""
if User.objects.filter(username=value).exists():

View file

@ -117,7 +117,6 @@ class TenantViewSet(UsedByMixin, ModelViewSet):
responses=CurrentTenantSerializer(many=False),
)
@action(methods=["GET"], detail=False, permission_classes=[AllowAny])
# pylint: disable=invalid-name, unused-argument
def current(self, request: Request) -> Response:
"""Get current tenant"""
tenant: Tenant = request._request.tenant

View file

@ -82,7 +82,7 @@ else:
workers = int(CONFIG.y("web.workers", default_workers))
threads = int(CONFIG.y("web.threads", 4))
# pylint: disable=unused-argument
def post_fork(server: "Arbiter", worker: DjangoUvicornWorker):
"""Tell prometheus to use worker number instead of process ID for multiprocess"""
from prometheus_client import values
@ -90,7 +90,6 @@ def post_fork(server: "Arbiter", worker: DjangoUvicornWorker):
values.ValueClass = MultiProcessValue(lambda: worker._worker_id)
# pylint: disable=unused-argument
def worker_exit(server: "Arbiter", worker: DjangoUvicornWorker):
"""Remove pid dbs when worker is shutdown"""
from prometheus_client import multiprocess

View file

@ -62,10 +62,19 @@ exclude_lines = [
]
show_missing = true
[tool.pylint.basic]
good-names = [
"pk",
"id",
"i",
"j",
"k",
"_",
]
[tool.pylint.master]
disable = [
"arguments-differ",
"fixme",
"locally-disabled",
"too-many-ancestors",
"too-few-public-methods",
@ -74,6 +83,7 @@ disable = [
"similarities",
"cyclic-import",
"protected-access",
"unused-argument",
"raise-missing-from",
# To preserve django's translation function we need to use %-formatting
"consider-using-f-string",

View file

@ -65,18 +65,20 @@ class OutpostDockerTests(ChannelsLiveServerTestCase):
external_host="http://localhost",
authorization_flow=create_test_flow(),
)
authentication_kp = CertificateKeyPair.objects.create(
name="docker-authentication",
# pylint: disable=consider-using-with
certificate_data=open(f"{self.ssl_folder}/client/cert.pem", encoding="utf8").read(),
# pylint: disable=consider-using-with
key_data=open(f"{self.ssl_folder}/client/key.pem", encoding="utf8").read(),
)
verification_kp = CertificateKeyPair.objects.create(
name="docker-verification",
# pylint: disable=consider-using-with
certificate_data=open(f"{self.ssl_folder}/client/ca.pem", encoding="utf8").read(),
)
with (
open(f"{self.ssl_folder}/client/cert.pem", encoding="utf8") as cert,
open(f"{self.ssl_folder}/client/key.pem", encoding="utf8") as key,
):
authentication_kp = CertificateKeyPair.objects.create(
name="docker-authentication",
certificate_data=cert.read(),
key_data=key.read(),
)
with open(f"{self.ssl_folder}/client/ca.pem", encoding="utf8") as authority:
verification_kp = CertificateKeyPair.objects.create(
name="docker-verification",
certificate_data=authority.read(),
)
self.service_connection = DockerServiceConnection.objects.create(
url="https://localhost:2376",
tls_verification=verification_kp,

View file

@ -65,18 +65,20 @@ class TestProxyDocker(ChannelsLiveServerTestCase):
external_host="http://localhost",
authorization_flow=create_test_flow(),
)
authentication_kp = CertificateKeyPair.objects.create(
name="docker-authentication",
# pylint: disable=consider-using-with
certificate_data=open(f"{self.ssl_folder}/client/cert.pem", encoding="utf8").read(),
# pylint: disable=consider-using-with
key_data=open(f"{self.ssl_folder}/client/key.pem", encoding="utf8").read(),
)
verification_kp = CertificateKeyPair.objects.create(
name="docker-verification",
# pylint: disable=consider-using-with
certificate_data=open(f"{self.ssl_folder}/client/ca.pem", encoding="utf8").read(),
)
with (
open(f"{self.ssl_folder}/client/cert.pem", encoding="utf8") as cert,
open(f"{self.ssl_folder}/client/key.pem", encoding="utf8") as key,
):
authentication_kp = CertificateKeyPair.objects.create(
name="docker-authentication",
certificate_data=cert.read(),
key_data=key.read(),
)
with open(f"{self.ssl_folder}/client/ca.pem", encoding="utf8") as authority:
verification_kp = CertificateKeyPair.objects.create(
name="docker-verification",
certificate_data=authority.read(),
)
self.service_connection = DockerServiceConnection.objects.create(
url="https://localhost:2376",
tls_verification=verification_kp,