api_v1 #15

Merged
cayop merged 8 commits from api_v1 into main 2024-10-24 10:00:11 +00:00
4 changed files with 216 additions and 76 deletions

View File

@ -6,9 +6,11 @@ from django.urls import path
app_name = 'api' app_name = 'api'
urlpatterns = [ urlpatterns = [
path('snapshot/', views.NewSnapshot, name='new_snapshot'), path('v1/snapshot/', views.NewSnapshotView.as_view(), name='new_snapshot'),
path('tokens/', views.TokenView.as_view(), name='tokens'), path('v1/annotation/<str:pk>/', views.AddAnnotationView.as_view(), name='new_annotation'),
path('tokens/new', views.TokenNewView.as_view(), name='new_token'), path('v1/device/<str:pk>/', views.DetailsDeviceView.as_view(), name='device'),
path("tokens/<int:pk>/edit", views.EditTokenView.as_view(), name="edit_token"), path('v1/tokens/', views.TokenView.as_view(), name='tokens'),
path('tokens/<int:pk>/del', views.TokenDeleteView.as_view(), name='delete_token'), path('v1/tokens/new', views.TokenNewView.as_view(), name='new_token'),
path("v1/tokens/<int:pk>/edit", views.EditTokenView.as_view(), name="edit_token"),
path('v1/tokens/<int:pk>/del', views.TokenDeleteView.as_view(), name='delete_token'),
] ]

View File

@ -1,4 +1,6 @@
import json import json
import uuid
import logging
from uuid import uuid4 from uuid import uuid4
@ -7,6 +9,7 @@ from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect from django.shortcuts import get_object_or_404, redirect
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from django.views.decorators.csrf import csrf_exempt from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django_tables2 import SingleTableView from django_tables2 import SingleTableView
from django.views.generic.edit import ( from django.views.generic.edit import (
CreateView, CreateView,
@ -15,54 +18,90 @@ from django.views.generic.edit import (
) )
from utils.save_snapshots import move_json, save_in_disk from utils.save_snapshots import move_json, save_in_disk
from django.views.generic.edit import View
from dashboard.mixins import DashboardView from dashboard.mixins import DashboardView
from evidence.models import Annotation from evidence.models import Annotation
from evidence.parse_details import ParseSnapshot
from evidence.parse import Build from evidence.parse import Build
from device.models import Device
from api.models import Token from api.models import Token
from api.tables import TokensTable from api.tables import TokensTable
@csrf_exempt logger = logging.getLogger('django')
def NewSnapshot(request):
# Accept only posts
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
class ApiMixing(View):
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
def auth(self):
# Authentication # Authentication
auth_header = request.headers.get('Authorization') auth_header = self.request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '): if not auth_header or not auth_header.startswith('Bearer '):
logger.exception("Invalid or missing token {}".format(auth_header))
return JsonResponse({'error': 'Invalid or missing token'}, status=401) return JsonResponse({'error': 'Invalid or missing token'}, status=401)
token = auth_header.split(' ')[1] token = auth_header.split(' ')[1].strip("'").strip('"')
tk = Token.objects.filter(token=token).first() try:
if not tk: uuid.UUID(token)
except Exception:
logger.exception("Invalid token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401) return JsonResponse({'error': 'Invalid or missing token'}, status=401)
self.tk = Token.objects.filter(token=token).first()
if not self.tk:
logger.exception("Invalid or missing token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
class NewSnapshotView(ApiMixing):
def get(self, request, *args, **kwargs):
return JsonResponse({}, status=404)
def post(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
# Validation snapshot # Validation snapshot
try: try:
data = json.loads(request.body) data = json.loads(request.body)
except json.JSONDecodeError: except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400) logger.exception("Invalid Snapshot of user {}".format(self.tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
# Process snapshot
path_name = save_in_disk(data, self.tk.owner.institution.name)
# try: # try:
# Build(data, None, check=True) # Build(data, None, check=True)
# except Exception: # except Exception:
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400) # return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
if not data.get("uuid"):
txt = "error: the snapshot not have uuid"
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
exist_annotation = Annotation.objects.filter( exist_annotation = Annotation.objects.filter(
uuid=data['uuid'] uuid=data['uuid']
).first() ).first()
if exist_annotation: if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid']) txt = "error: the snapshot {} exist".format(data['uuid'])
logger.exception(txt)
return JsonResponse({'status': txt}, status=500) return JsonResponse({'status': txt}, status=500)
# Process snapshot
path_name = save_in_disk(data, tk.owner.institution.name)
try: try:
Build(data, tk.owner) Build(data, self.tk.owner)
except Exception as err: except Exception as err:
logger.exception(err)
return JsonResponse({'status': f"fail: {err}"}, status=500) return JsonResponse({'status': f"fail: {err}"}, status=500)
annotation = Annotation.objects.filter( annotation = Annotation.objects.filter(
@ -70,11 +109,12 @@ def NewSnapshot(request):
type=Annotation.Type.SYSTEM, type=Annotation.Type.SYSTEM,
# TODO this is hardcoded, it should select the user preferred algorithm # TODO this is hardcoded, it should select the user preferred algorithm
key="hidalgo1", key="hidalgo1",
owner=tk.owner.institution owner=self.tk.owner.institution
).first() ).first()
if not annotation: if not annotation:
logger.exception("Error: No annotation for uuid: {}".format(data["uuid"]))
return JsonResponse({'status': 'fail'}, status=500) return JsonResponse({'status': 'fail'}, status=500)
url_args = reverse_lazy("device:details", args=(annotation.value,)) url_args = reverse_lazy("device:details", args=(annotation.value,))
@ -87,7 +127,7 @@ def NewSnapshot(request):
# TODO replace with public_url when available # TODO replace with public_url when available
"public_url": url "public_url": url
} }
move_json(path_name, tk.owner.institution.name) move_json(path_name, self.tk.owner.institution.name)
return JsonResponse(response, status=200) return JsonResponse(response, status=200)
@ -165,3 +205,99 @@ class EditTokenView(DashboardView, UpdateView):
) )
kwargs = super().get_form_kwargs() kwargs = super().get_form_kwargs()
return kwargs return kwargs
class DetailsDeviceView(ApiMixing):
def get(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
self.pk = kwargs['pk']
self.object = Device(id=self.pk)
if not self.object.last_evidence:
return JsonResponse({}, status=404)
if self.object.owner != self.tk.owner.institution:
return JsonResponse({}, status=403)
data = self.get_data()
return JsonResponse(data, status=200)
def post(self, request, *args, **kwargs):
return JsonResponse({}, status=404)
def get_data(self):
data = {}
self.object.initial()
self.object.get_last_evidence()
evidence = self.object.last_evidence
if evidence.is_legacy():
data.update({
"device": evidence.get("device"),
"components": evidence.get("components"),
})
else:
evidence.get_doc()
snapshot = ParseSnapshot(evidence.doc).snapshot_json
data.update({
"device": snapshot.get("device"),
"components": snapshot.get("components"),
})
uuids = Annotation.objects.filter(
owner=self.tk.owner.institution,
value=self.pk
).values("uuid")
annotations = Annotation.objects.filter(
uuid__in=uuids,
owner=self.tk.owner.institution,
type = Annotation.Type.USER
).values_list("key", "value")
data.update({"annotations": list(annotations)})
return data
class AddAnnotationView(ApiMixing):
def post(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
self.pk = kwargs['pk']
institution = self.tk.owner.institution
self.annotation = Annotation.objects.filter(
owner=institution,
value=self.pk,
type=Annotation.Type.SYSTEM
).first()
if not self.annotation:
return JsonResponse({}, status=404)
try:
data = json.loads(request.body)
key = data["key"]
value = data["value"]
except Exception:
logger.exception("Invalid Snapshot of user {}".format(self.tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
Annotation.objects.create(
uuid=self.annotation.uuid,
owner=self.tk.owner.institution,
type = Annotation.Type.USER,
key = key,
value = value
)
return JsonResponse({"status": "success"}, status=200)
def get(self, request, *args, **kwargs):
return JsonResponse({}, status=404)

View File

@ -1,8 +1,7 @@
from django.db import models, connection from django.db import models, connection
from utils.constants import STR_SM_SIZE, STR_SIZE, STR_EXTEND_SIZE, ALGOS from utils.constants import ALGOS
from evidence.models import Annotation, Evidence from evidence.models import Annotation, Evidence
from user.models import User
from lot.models import DeviceLot from lot.models import DeviceLot

View File

@ -3,7 +3,7 @@ import json
from dmidecode import DMIParse from dmidecode import DMIParse
from django.db import models from django.db import models
from utils.constants import STR_SM_SIZE, STR_EXTEND_SIZE, CHASSIS_DH from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH
from evidence.xapian import search from evidence.xapian import search
from evidence.parse_details import ParseSnapshot from evidence.parse_details import ParseSnapshot
from user.models import User, Institution from user.models import User, Institution
@ -67,7 +67,7 @@ class Evidence:
for xa in matches: for xa in matches:
self.doc = json.loads(xa.document.get_data()) self.doc = json.loads(xa.document.get_data())
if self.doc.get("software") == "workbench-script": if not self.is_legacy():
dmidecode_raw = self.doc["data"]["dmidecode"] dmidecode_raw = self.doc["data"]["dmidecode"]
self.dmi = DMIParse(dmidecode_raw) self.dmi = DMIParse(dmidecode_raw)
@ -80,7 +80,7 @@ class Evidence:
self.created = self.annotations.last().created self.created = self.annotations.last().created
def get_components(self): def get_components(self):
if self.doc.get("software") != "workbench-script": if self.is_legacy():
return self.doc.get('components', []) return self.doc.get('components', [])
self.set_components() self.set_components()
return self.components return self.components
@ -92,7 +92,7 @@ class Evidence:
return "" return ""
return list(self.doc.get('kv').values())[0] return list(self.doc.get('kv').values())[0]
if self.doc.get("software") != "workbench-script": if self.is_legacy():
return self.doc['device']['manufacturer'] return self.doc['device']['manufacturer']
return self.dmi.manufacturer().strip() return self.dmi.manufacturer().strip()
@ -104,13 +104,13 @@ class Evidence:
return "" return ""
return list(self.doc.get('kv').values())[1] return list(self.doc.get('kv').values())[1]
if self.doc.get("software") != "workbench-script": if self.is_legacy():
return self.doc['device']['model'] return self.doc['device']['model']
return self.dmi.model().strip() return self.dmi.model().strip()
def get_chassis(self): def get_chassis(self):
if self.doc.get("software") != "workbench-script": if self.is_legacy():
return self.doc['device']['model'] return self.doc['device']['model']
chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual') chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual')
@ -132,3 +132,6 @@ class Evidence:
def set_components(self): def set_components(self):
snapshot = ParseSnapshot(self.doc).snapshot_json snapshot = ParseSnapshot(self.doc).snapshot_json
self.components = snapshot['components'] self.components = snapshot['components']
def is_legacy(self):
return self.doc.get("software") != "workbench-script"