api_v1 #15

Merged
cayop merged 8 commits from api_v1 into main 2024-10-24 10:00:11 +00:00
2 changed files with 110 additions and 72 deletions
Showing only changes of commit 9b96955c30 - Show all commits

View file

@ -6,7 +6,7 @@ from django.urls import path
app_name = 'api'
urlpatterns = [
path('v1/snapshot/', views.NewSnapshot, name='new_snapshot'),
path('v1/snapshot/', views.NewSnapshotView.as_view(), name='new_snapshot'),
path('v1/tokens/', views.TokenView.as_view(), name='tokens'),
path('v1/tokens/new', views.TokenNewView.as_view(), name='new_token'),
path("v1/tokens/<int:pk>/edit", views.EditTokenView.as_view(), name="edit_token"),

View file

@ -9,6 +9,7 @@ from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect
from django.utils.translation import gettext_lazy as _
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django_tables2 import SingleTableView
from django.views.generic.edit import (
CreateView,
@ -17,6 +18,7 @@ from django.views.generic.edit import (
)
from utils.save_snapshots import move_json, save_in_disk
from django.views.generic.edit import View
from dashboard.mixins import DashboardView
from evidence.models import Annotation
from evidence.parse import Build
@ -27,87 +29,105 @@ from api.tables import TokensTable
logger = logging.getLogger('django')
@csrf_exempt
def NewSnapshot(request):
# Accept only posts
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
class ApiMixing(View):
# Authentication
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
logger.exception("Invalid or missing token {}".format(auth_header))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
token = auth_header.split(' ')[1].strip("'").strip('"')
try:
uuid.UUID(token)
except Exception:
logger.exception("Invalid token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
tk = Token.objects.filter(token=token).first()
if not tk:
logger.exception("Invalid or missing token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
# Validation snapshot
try:
data = json.loads(request.body)
except json.JSONDecodeError:
logger.exception("Invalid Snapshot of user {}".format(tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
def auth(self):
# Authentication
auth_header = self.request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
logger.exception("Invalid or missing token {}".format(auth_header))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
# try:
# Build(data, None, check=True)
# except Exception:
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
token = auth_header.split(' ')[1].strip("'").strip('"')
try:
uuid.UUID(token)
except Exception:
logger.exception("Invalid token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
exist_annotation = Annotation.objects.filter(
uuid=data['uuid']
).first()
self.tk = Token.objects.filter(token=token).first()
if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid'])
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
# Process snapshot
path_name = save_in_disk(data, tk.owner.institution.name)
try:
Build(data, tk.owner)
except Exception as err:
logger.exception(err)
return JsonResponse({'status': f"fail: {err}"}, status=500)
annotation = Annotation.objects.filter(
uuid=data['uuid'],
type=Annotation.Type.SYSTEM,
# TODO this is hardcoded, it should select the user preferred algorithm
key="hidalgo1",
owner=tk.owner.institution
).first()
if not self.tk:
logger.exception("Invalid or missing token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
if not annotation:
logger.exception("Error: No annotation for uuid: {}".format(data["uuid"]))
return JsonResponse({'status': 'fail'}, status=500)
class NewSnapshotView(ApiMixing):
url_args = reverse_lazy("device:details", args=(annotation.value,))
url = request.build_absolute_uri(url_args)
def get(self, request, *args, **kwargs):
return JsonResponse({}, status=404)
response = {
"status": "success",
"dhid": annotation.value[:6].upper(),
"url": url,
# TODO replace with public_url when available
"public_url": url
}
move_json(path_name, tk.owner.institution.name)
def post(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
return JsonResponse(response, status=200)
# Validation snapshot
try:
data = json.loads(request.body)
except json.JSONDecodeError:
logger.exception("Invalid Snapshot of user {}".format(self.tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
# Process snapshot
path_name = save_in_disk(data, self.tk.owner.institution.name)
# try:
# Build(data, None, check=True)
# except Exception:
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
if not data.get("uuid"):
txt = "error: the snapshot not have uuid"
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
exist_annotation = Annotation.objects.filter(
uuid=data['uuid']
).first()
if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid'])
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
try:
Build(data, self.tk.owner)
except Exception as err:
logger.exception(err)
return JsonResponse({'status': f"fail: {err}"}, status=500)
annotation = Annotation.objects.filter(
uuid=data['uuid'],
type=Annotation.Type.SYSTEM,
# TODO this is hardcoded, it should select the user preferred algorithm
key="hidalgo1",
owner=self.tk.owner.institution
).first()
if not annotation:
logger.exception("Error: No annotation for uuid: {}".format(data["uuid"]))
return JsonResponse({'status': 'fail'}, status=500)
url_args = reverse_lazy("device:details", args=(annotation.value,))
url = request.build_absolute_uri(url_args)
response = {
"status": "success",
"dhid": annotation.value[:6].upper(),
"url": url,
# TODO replace with public_url when available
"public_url": url
}
move_json(path_name, self.tk.owner.institution.name)
return JsonResponse(response, status=200)
class TokenView(DashboardView, SingleTableView):
@ -183,3 +203,21 @@ class EditTokenView(DashboardView, UpdateView):
)
kwargs = super().get_form_kwargs()
return kwargs
class DetailsComputerView(ApiMixing):
def get(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
try:
data = json.loads(request.body)
except:
pass
return JsonResponse({}, status=404)
def post(self, request, *args, **kwargs):
return JsonResponse({}, status=404)