Compare commits

...

2 Commits

4 changed files with 159 additions and 80 deletions

View File

@ -6,7 +6,8 @@ from django.urls import path
app_name = 'api'
urlpatterns = [
path('v1/snapshot/', views.NewSnapshot, name='new_snapshot'),
path('v1/snapshot/', views.NewSnapshotView.as_view(), name='new_snapshot'),
path('v1/device/<str:pk>/', views.DetailsDeviceView.as_view(), name='device'),
path('v1/tokens/', views.TokenView.as_view(), name='tokens'),
path('v1/tokens/new', views.TokenNewView.as_view(), name='new_token'),
path("v1/tokens/<int:pk>/edit", views.EditTokenView.as_view(), name="edit_token"),

View File

@ -9,6 +9,7 @@ from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect
from django.utils.translation import gettext_lazy as _
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django_tables2 import SingleTableView
from django.views.generic.edit import (
CreateView,
@ -17,6 +18,7 @@ from django.views.generic.edit import (
)
from utils.save_snapshots import move_json, save_in_disk
from django.views.generic.edit import View
from dashboard.mixins import DashboardView
from evidence.models import Annotation
from evidence.parse import Build
@ -27,87 +29,105 @@ from api.tables import TokensTable
logger = logging.getLogger('django')
@csrf_exempt
def NewSnapshot(request):
# Accept only posts
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
class ApiMixing(View):
# Authentication
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
logger.exception("Invalid or missing token {}".format(auth_header))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
token = auth_header.split(' ')[1].strip("'").strip('"')
try:
uuid.UUID(token)
except Exception:
logger.exception("Invalid token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
tk = Token.objects.filter(token=token).first()
if not tk:
logger.exception("Invalid or missing token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
# Validation snapshot
try:
data = json.loads(request.body)
except json.JSONDecodeError:
logger.exception("Invalid Snapshot of user {}".format(tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
def auth(self):
# Authentication
auth_header = self.request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
logger.exception("Invalid or missing token {}".format(auth_header))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
# try:
# Build(data, None, check=True)
# except Exception:
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
token = auth_header.split(' ')[1].strip("'").strip('"')
try:
uuid.UUID(token)
except Exception:
logger.exception("Invalid token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
exist_annotation = Annotation.objects.filter(
uuid=data['uuid']
).first()
self.tk = Token.objects.filter(token=token).first()
if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid'])
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
# Process snapshot
path_name = save_in_disk(data, tk.owner.institution.name)
try:
Build(data, tk.owner)
except Exception as err:
logger.exception(err)
return JsonResponse({'status': f"fail: {err}"}, status=500)
annotation = Annotation.objects.filter(
uuid=data['uuid'],
type=Annotation.Type.SYSTEM,
# TODO this is hardcoded, it should select the user preferred algorithm
key="hidalgo1",
owner=tk.owner.institution
).first()
if not self.tk:
logger.exception("Invalid or missing token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
if not annotation:
logger.exception("Error: No annotation for uuid: {}".format(data["uuid"]))
return JsonResponse({'status': 'fail'}, status=500)
class NewSnapshotView(ApiMixing):
url_args = reverse_lazy("device:details", args=(annotation.value,))
url = request.build_absolute_uri(url_args)
def get(self, request, *args, **kwargs):
return JsonResponse({}, status=404)
response = {
"status": "success",
"dhid": annotation.value[:6].upper(),
"url": url,
# TODO replace with public_url when available
"public_url": url
}
move_json(path_name, tk.owner.institution.name)
def post(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
return JsonResponse(response, status=200)
# Validation snapshot
try:
data = json.loads(request.body)
except json.JSONDecodeError:
logger.exception("Invalid Snapshot of user {}".format(self.tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
# Process snapshot
path_name = save_in_disk(data, self.tk.owner.institution.name)
# try:
# Build(data, None, check=True)
# except Exception:
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
if not data.get("uuid"):
txt = "error: the snapshot not have uuid"
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
exist_annotation = Annotation.objects.filter(
uuid=data['uuid']
).first()
if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid'])
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
try:
Build(data, self.tk.owner)
except Exception as err:
logger.exception(err)
return JsonResponse({'status': f"fail: {err}"}, status=500)
annotation = Annotation.objects.filter(
uuid=data['uuid'],
type=Annotation.Type.SYSTEM,
# TODO this is hardcoded, it should select the user preferred algorithm
key="hidalgo1",
owner=self.tk.owner.institution
).first()
if not annotation:
logger.exception("Error: No annotation for uuid: {}".format(data["uuid"]))
return JsonResponse({'status': 'fail'}, status=500)
url_args = reverse_lazy("device:details", args=(annotation.value,))
url = request.build_absolute_uri(url_args)
response = {
"status": "success",
"dhid": annotation.value[:6].upper(),
"url": url,
# TODO replace with public_url when available
"public_url": url
}
move_json(path_name, self.tk.owner.institution.name)
return JsonResponse(response, status=200)
class TokenView(DashboardView, SingleTableView):
@ -183,3 +203,59 @@ class EditTokenView(DashboardView, UpdateView):
)
kwargs = super().get_form_kwargs()
return kwargs
class DetailsDeviceView(ApiMixing):
def get(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
self.pk = kwargs['pk']
self.object = Device(id=self.pk)
if not self.object.last_evidence:
return JsonResponse({}, status=404)
if self.object.owner != self.tk.owner.institution:
return JsonResponse({}, status=403)
data = self.get_data()
return JsonResponse(data, status=200)
def post(self, request, *args, **kwargs):
return JsonResponse({}, status=404)
def get_data(self):
data = {}
self.object.initial()
self.object.get_last_evidence()
evidence = self.object.last_evidence
if evidence.is_legacy():
data.update({
"device": evidence.get("device"),
"components": evidence.get("components"),
})
else:
evidence.get_doc()
snapshot = ParseSnapshot(evidence.doc).snapshot_json
data.update({
"device": snapshot.get("device"),
"components": snapshot.get("components"),
})
uuids = Annotation.objects.filter(
owner=self.tk.owner.institution,
value=self.pk
).values("uuid")
annotations = Annotation.objects.filter(
uuid__in=uuids,
owner=self.tk.owner.institution,
type = Annotation.Type.USER
).values_list("key", "value")
data.update({"annotations": list(annotations)})
return data

View File

@ -1,8 +1,7 @@
from django.db import models, connection
from utils.constants import STR_SM_SIZE, STR_SIZE, STR_EXTEND_SIZE, ALGOS
from utils.constants import ALGOS
from evidence.models import Annotation, Evidence
from user.models import User
from lot.models import DeviceLot

View File

@ -3,7 +3,7 @@ import json
from dmidecode import DMIParse
from django.db import models
from utils.constants import STR_SM_SIZE, STR_EXTEND_SIZE, CHASSIS_DH
from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH
from evidence.xapian import search
from evidence.parse_details import ParseSnapshot
from user.models import User, Institution
@ -67,7 +67,7 @@ class Evidence:
for xa in matches:
self.doc = json.loads(xa.document.get_data())
if self.doc.get("software") == "workbench-script":
if not self.is_legacy():
dmidecode_raw = self.doc["data"]["dmidecode"]
self.dmi = DMIParse(dmidecode_raw)
@ -80,7 +80,7 @@ class Evidence:
self.created = self.annotations.last().created
def get_components(self):
if self.doc.get("software") != "workbench-script":
if self.is_legacy():
return self.doc.get('components', [])
self.set_components()
return self.components
@ -92,7 +92,7 @@ class Evidence:
return ""
return list(self.doc.get('kv').values())[0]
if self.doc.get("software") != "workbench-script":
if self.is_legacy():
return self.doc['device']['manufacturer']
return self.dmi.manufacturer().strip()
@ -104,13 +104,13 @@ class Evidence:
return ""
return list(self.doc.get('kv').values())[1]
if self.doc.get("software") != "workbench-script":
if self.is_legacy():
return self.doc['device']['model']
return self.dmi.model().strip()
def get_chassis(self):
if self.doc.get("software") != "workbench-script":
if self.is_legacy():
return self.doc['device']['model']
chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual')
@ -132,3 +132,6 @@ class Evidence:
def set_components(self):
snapshot = ParseSnapshot(self.doc).snapshot_json
self.components = snapshot['components']
def is_legacy(self):
return self.doc.get("software") != "workbench-script"