sources/plex: add background task to monitor validity of plex token

closes #1205

Signed-off-by: Jens Langhammer <jens.langhammer@beryju.org>
This commit is contained in:
Jens Langhammer 2021-08-01 12:24:52 +02:00
parent 16e6e4c3b7
commit a97f842112
3 changed files with 71 additions and 0 deletions

View File

@ -0,0 +1,10 @@
"""Plex source settings"""
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
"check_plex_token": {
"task": "authentik.sources.plex.tasks.check_plex_token_all",
"schedule": crontab(minute="31", hour="*/3"),
"options": {"queue": "authentik_scheduled"},
},
}

View File

@ -0,0 +1,43 @@
"""Plex tasks"""
from requests import RequestException
from authentik.events.models import Event, EventAction
from authentik.events.monitored_tasks import MonitoredTask, TaskResult, TaskResultStatus
from authentik.root.celery import CELERY_APP
from authentik.sources.plex.models import PlexSource
from authentik.sources.plex.plex import PlexAuth
@CELERY_APP.task()
def check_plex_token_all():
"""Check plex token for all plex sources"""
for source in PlexSource.objects.all():
check_plex_token.delay(source.slug)
@CELERY_APP.task(bind=True, base=MonitoredTask)
def check_plex_token(self: MonitoredTask, source_slug: int):
"""Check the validity of a Plex source."""
sources = PlexSource.objects.filter(slug=source_slug)
if not sources.exists():
return
source: PlexSource = sources.first()
self.set_uid(source.slug)
auth = PlexAuth(source, source.plex_token)
try:
auth.get_user_info()
self.set_status(
TaskResult(TaskResultStatus.SUCCESSFUL, ["Plex token is valid."])
)
except RequestException as exc:
self.set_status(
TaskResult(
TaskResultStatus.ERROR,
["Plex token is invalid/an error occurred:", str(exc)],
)
)
Event.new(
EventAction.CONFIGURATION_ERROR,
message=f"Plex token invalid, please re-authenticate source.\n{str(exc)}",
source=source,
).save()

View File

@ -1,10 +1,13 @@
"""plex Source tests""" """plex Source tests"""
from django.test import TestCase from django.test import TestCase
from requests.exceptions import RequestException
from requests_mock import Mocker from requests_mock import Mocker
from authentik.events.models import Event, EventAction
from authentik.providers.oauth2.generators import generate_client_secret from authentik.providers.oauth2.generators import generate_client_secret
from authentik.sources.plex.models import PlexSource from authentik.sources.plex.models import PlexSource
from authentik.sources.plex.plex import PlexAuth from authentik.sources.plex.plex import PlexAuth
from authentik.sources.plex.tasks import check_plex_token_all
USER_INFO_RESPONSE = { USER_INFO_RESPONSE = {
"id": 1234123419, "id": 1234123419,
@ -62,3 +65,18 @@ class TestPlexSource(TestCase):
with Mocker() as mocker: with Mocker() as mocker:
mocker.get("https://plex.tv/api/v2/resources", json=RESOURCES_RESPONSE) mocker.get("https://plex.tv/api/v2/resources", json=RESOURCES_RESPONSE)
self.assertTrue(api.check_server_overlap()) self.assertTrue(api.check_server_overlap())
def test_check_task(self):
"""Test token check task"""
with Mocker() as mocker:
mocker.get("https://plex.tv/api/v2/user", json=USER_INFO_RESPONSE)
check_plex_token_all()
self.assertFalse(
Event.objects.filter(action=EventAction.CONFIGURATION_ERROR).exists()
)
with Mocker() as mocker:
mocker.get("https://plex.tv/api/v2/user", exc=RequestException())
check_plex_token_all()
self.assertTrue(
Event.objects.filter(action=EventAction.CONFIGURATION_ERROR).exists()
)