feature/90-implement-public-website-for-device #17

Merged
cayop merged 32 commits from feature/90-implement-public-website-for-device into main 2024-10-30 15:02:18 +00:00
9 changed files with 353 additions and 96 deletions
Showing only changes of commit 23c0b004a0 - Show all commits

View file

@ -6,9 +6,11 @@ from django.urls import path
app_name = 'api'
urlpatterns = [
path('snapshot/', views.NewSnapshot, name='new_snapshot'),
path('tokens/', views.TokenView.as_view(), name='tokens'),
path('tokens/new', views.TokenNewView.as_view(), name='new_token'),
path("tokens/<int:pk>/edit", views.EditTokenView.as_view(), name="edit_token"),
path('tokens/<int:pk>/del', views.TokenDeleteView.as_view(), name='delete_token'),
path('v1/snapshot/', views.NewSnapshotView.as_view(), name='new_snapshot'),
path('v1/annotation/<str:pk>/', views.AddAnnotationView.as_view(), name='new_annotation'),
path('v1/device/<str:pk>/', views.DetailsDeviceView.as_view(), name='device'),
path('v1/tokens/', views.TokenView.as_view(), name='tokens'),
path('v1/tokens/new', views.TokenNewView.as_view(), name='new_token'),
path("v1/tokens/<int:pk>/edit", views.EditTokenView.as_view(), name="edit_token"),
path('v1/tokens/<int:pk>/del', views.TokenDeleteView.as_view(), name='delete_token'),
]

View file

@ -1,4 +1,6 @@
import json
import uuid
import logging
from uuid import uuid4
@ -7,6 +9,7 @@ from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect
from django.utils.translation import gettext_lazy as _
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django_tables2 import SingleTableView
from django.views.generic.edit import (
CreateView,
@ -15,81 +18,118 @@ from django.views.generic.edit import (
)
from utils.save_snapshots import move_json, save_in_disk
from django.views.generic.edit import View
from dashboard.mixins import DashboardView
from evidence.models import Annotation
from evidence.parse_details import ParseSnapshot
from evidence.parse import Build
from device.models import Device
from api.models import Token
from api.tables import TokensTable
@csrf_exempt
def NewSnapshot(request):
# Accept only posts
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
# Authentication
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
token = auth_header.split(' ')[1]
tk = Token.objects.filter(token=token).first()
if not tk:
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
# Validation snapshot
try:
data = json.loads(request.body)
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
# try:
# Build(data, None, check=True)
# except Exception:
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
exist_annotation = Annotation.objects.filter(
uuid=data['uuid']
).first()
if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid'])
return JsonResponse({'status': txt}, status=500)
# Process snapshot
path_name = save_in_disk(data, tk.owner.institution.name)
try:
Build(data, tk.owner)
except Exception as err:
return JsonResponse({'status': f"fail: {err}"}, status=500)
annotation = Annotation.objects.filter(
uuid=data['uuid'],
type=Annotation.Type.SYSTEM,
# TODO this is hardcoded, it should select the user preferred algorithm
key="hidalgo1",
owner=tk.owner.institution
).first()
logger = logging.getLogger('django')
if not annotation:
return JsonResponse({'status': 'fail'}, status=500)
class ApiMixing(View):
url_args = reverse_lazy("device:details", args=(annotation.value,))
url = request.build_absolute_uri(url_args)
@method_decorator(csrf_exempt)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
response = {
"status": "success",
"dhid": annotation.value[:6].upper(),
"url": url,
# TODO replace with public_url when available
"public_url": url
}
move_json(path_name, tk.owner.institution.name)
def auth(self):
# Authentication
auth_header = self.request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
logger.exception("Invalid or missing token {}".format(auth_header))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
return JsonResponse(response, status=200)
token = auth_header.split(' ')[1].strip("'").strip('"')
try:
uuid.UUID(token)
except Exception:
logger.exception("Invalid token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
self.tk = Token.objects.filter(token=token).first()
if not self.tk:
logger.exception("Invalid or missing token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
class NewSnapshotView(ApiMixing):
def get(self, request, *args, **kwargs):
return JsonResponse({}, status=404)
def post(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
# Validation snapshot
try:
data = json.loads(request.body)
except json.JSONDecodeError:
logger.exception("Invalid Snapshot of user {}".format(self.tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
# Process snapshot
path_name = save_in_disk(data, self.tk.owner.institution.name)
# try:
# Build(data, None, check=True)
# except Exception:
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
if not data.get("uuid"):
txt = "error: the snapshot not have uuid"
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
exist_annotation = Annotation.objects.filter(
uuid=data['uuid']
).first()
if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid'])
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
try:
Build(data, self.tk.owner)
except Exception as err:
logger.exception(err)
return JsonResponse({'status': f"fail: {err}"}, status=500)
annotation = Annotation.objects.filter(
uuid=data['uuid'],
type=Annotation.Type.SYSTEM,
# TODO this is hardcoded, it should select the user preferred algorithm
key="hidalgo1",
owner=self.tk.owner.institution
).first()
if not annotation:
logger.exception("Error: No annotation for uuid: {}".format(data["uuid"]))
return JsonResponse({'status': 'fail'}, status=500)
url_args = reverse_lazy("device:details", args=(annotation.value,))
url = request.build_absolute_uri(url_args)
response = {
"status": "success",
"dhid": annotation.value[:6].upper(),
"url": url,
# TODO replace with public_url when available
"public_url": url
}
move_json(path_name, self.tk.owner.institution.name)
return JsonResponse(response, status=200)
class TokenView(DashboardView, SingleTableView):
@ -165,3 +205,99 @@ class EditTokenView(DashboardView, UpdateView):
)
kwargs = super().get_form_kwargs()
return kwargs
class DetailsDeviceView(ApiMixing):
def get(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
self.pk = kwargs['pk']
self.object = Device(id=self.pk)
if not self.object.last_evidence:
return JsonResponse({}, status=404)
if self.object.owner != self.tk.owner.institution:
return JsonResponse({}, status=403)
data = self.get_data()
return JsonResponse(data, status=200)
def post(self, request, *args, **kwargs):
return JsonResponse({}, status=404)
def get_data(self):
data = {}
self.object.initial()
self.object.get_last_evidence()
evidence = self.object.last_evidence
if evidence.is_legacy():
data.update({
"device": evidence.get("device"),
"components": evidence.get("components"),
})
else:
evidence.get_doc()
snapshot = ParseSnapshot(evidence.doc).snapshot_json
data.update({
"device": snapshot.get("device"),
"components": snapshot.get("components"),
})
uuids = Annotation.objects.filter(
owner=self.tk.owner.institution,
value=self.pk
).values("uuid")
annotations = Annotation.objects.filter(
uuid__in=uuids,
owner=self.tk.owner.institution,
type = Annotation.Type.USER
).values_list("key", "value")
data.update({"annotations": list(annotations)})
return data
class AddAnnotationView(ApiMixing):
def post(self, request, *args, **kwargs):
response = self.auth()
if response:
return response
self.pk = kwargs['pk']
institution = self.tk.owner.institution
self.annotation = Annotation.objects.filter(
owner=institution,
value=self.pk,
type=Annotation.Type.SYSTEM
).first()
if not self.annotation:
return JsonResponse({}, status=404)
try:
data = json.loads(request.body)
key = data["key"]
value = data["value"]
except Exception:
logger.exception("Invalid Snapshot of user {}".format(self.tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
Annotation.objects.create(
uuid=self.annotation.uuid,
owner=self.tk.owner.institution,
type = Annotation.Type.USER,
key = key,
value = value
)
return JsonResponse({"status": "success"}, status=200)
def get(self, request, *args, **kwargs):
return JsonResponse({}, status=404)

View file

@ -66,7 +66,7 @@ class SearchView(InventaryMixin):
limit
)
if not matches.size():
if not matches or not matches.size():
return self.search_hids(query, offset, limit)
devices = []

View file

@ -1,8 +1,7 @@
from django.db import models, connection
from utils.constants import STR_SM_SIZE, STR_SIZE, STR_EXTEND_SIZE, ALGOS
from utils.constants import ALGOS
from evidence.models import Annotation, Evidence
from user.models import User
from lot.models import DeviceLot

View file

@ -0,0 +1,86 @@
import os
import json
import logging
from django.core.management.base import BaseCommand
from django.conf import settings
from utils.device import create_annotation, create_doc, create_index
from user.models import Institution
from evidence.parse import Build
logger = logging.getLogger('django')
class Command(BaseCommand):
help = "Reindex snapshots"
snapshots = []
EVIDENCES = settings.EVIDENCES_DIR
def handle(self, *args, **kwargs):
if os.path.isdir(self.EVIDENCES):
self.read_files(self.EVIDENCES)
self.parsing()
def read_files(self, directory):
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
if not os.path.isdir(filepath):
continue
institution = Institution.objects.filter(name=filename).first()
if not institution:
continue
user = institution.user_set.filter(is_admin=True).first()
if not user:
txt = "Error No there are Admins for the institution: {}".format(
institution.name
)
logger.exception(txt)
continue
snapshots_path = os.path.join(filepath, "snapshots")
placeholders_path = os.path.join(filepath, "placeholders")
for f in os.listdir(snapshots_path):
f_path = os.path.join(snapshots_path, f)
if f_path[-5:] == ".json" and os.path.isfile(f_path):
self.open(f_path, user)
for f in os.listdir(placeholders_path):
f_path = os.path.join(placeholders_path, f)
if f_path[-5:] == ".json" and os.path.isfile(f_path):
self.open(f_path, user)
def open(self, filepath, user):
with open(filepath, 'r') as file:
content = json.loads(file.read())
self.snapshots.append((content, user, filepath))
def parsing(self):
for s, user, f_path in self.snapshots:
if s.get("type") == "Websnapshot":
self.build_placeholder(s, user, f_path)
else:
self.build_snapshot(s, user, f_path)
def build_placeholder(self, s, user, f_path):
try:
create_index(s, user)
create_annotation(s, user, commit=True)
except Exception as err:
txt = "Error: in placeholder {} \n{}".format(f_path, err)
logger.exception(txt)
def build_snapshot(self, s, user, f_path):
try:
Build(s, user)
except Exception as err:
txt = "Error: in Snapshot {} \n{}".format(f_path, err)
logger.exception(txt)

View file

@ -3,7 +3,7 @@ import json
from dmidecode import DMIParse
from django.db import models
from utils.constants import STR_SM_SIZE, STR_EXTEND_SIZE, CHASSIS_DH
from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH
from evidence.xapian import search
from evidence.parse_details import ParseSnapshot
from user.models import User, Institution
@ -61,13 +61,13 @@ class Evidence:
self.get_owner()
qry = 'uuid:"{}"'.format(self.uuid)
matches = search(self.owner, qry, limit=1)
if matches.size() < 0:
if matches and matches.size() < 0:
return
for xa in matches:
self.doc = json.loads(xa.document.get_data())
if self.doc.get("software") == "workbench-script":
if not self.is_legacy():
dmidecode_raw = self.doc["data"]["dmidecode"]
self.dmi = DMIParse(dmidecode_raw)
@ -80,7 +80,7 @@ class Evidence:
self.created = self.annotations.last().created
def get_components(self):
if self.doc.get("software") != "workbench-script":
if self.is_legacy():
return self.doc.get('components', [])
self.set_components()
return self.components
@ -92,7 +92,7 @@ class Evidence:
return ""
return list(self.doc.get('kv').values())[0]
if self.doc.get("software") != "workbench-script":
if self.is_legacy():
return self.doc['device']['manufacturer']
return self.dmi.manufacturer().strip()
@ -104,13 +104,13 @@ class Evidence:
return ""
return list(self.doc.get('kv').values())[1]
if self.doc.get("software") != "workbench-script":
if self.is_legacy():
return self.doc['device']['model']
return self.dmi.model().strip()
def get_chassis(self):
if self.doc.get("software") != "workbench-script":
if self.is_legacy():
return self.doc['device']['model']
chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual')
@ -132,3 +132,6 @@ class Evidence:
def set_components(self):
snapshot = ParseSnapshot(self.doc).snapshot_json
self.components = snapshot['components']
def is_legacy(self):
return self.doc.get("software") != "workbench-script"

View file

@ -1,15 +1,16 @@
import os
import json
import shutil
import hashlib
import logging
from datetime import datetime
from dmidecode import DMIParse
from json_repair import repair_json
from evidence.models import Annotation
from evidence.xapian import index
from utils.constants import ALGOS, CHASSIS_DH
from utils.constants import CHASSIS_DH
logger = logging.getLogger('django')
def get_network_cards(child, nets):
@ -17,15 +18,18 @@ def get_network_cards(child, nets):
nets.append(child)
if child.get('children'):
[get_network_cards(x, nets) for x in child['children']]
def get_mac(lshw):
nets = []
try:
hw = json.loads(lshw)
if type(lshw) is dict:
hw = lshw
else:
hw = json.loads(lshw)
except json.decoder.JSONDecodeError:
hw = json.loads(repair_json(lshw))
try:
get_network_cards(hw, nets)
except Exception as ss:
@ -74,10 +78,21 @@ class Build:
serial_number = device.get("serialNumber", '')
sku = device.get("sku", '')
hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}"
return hashlib.sha3_256(hid.encode()).hexdigest()
def create_annotations(self):
annotation = Annotation.objects.filter(
uuid=self.uuid,
owner=self.user.institution,
type=Annotation.Type.SYSTEM,
)
if annotation:
txt = "Warning: Snapshot {} exist as annotation !!".format(self.uuid)
logger.exception(txt)
return
for k, v in self.algorithms.items():
Annotation.objects.create(
@ -99,7 +114,7 @@ class Build:
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", "n/a").strip()
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
@ -115,7 +130,7 @@ class Build:
if not snapshot["data"].get('lshw'):
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
lshw = snapshot["data"]["lshw"]
# mac = get_mac2(hwinfo_raw) or ""
mac = get_mac(lshw) or ""

View file

@ -11,7 +11,10 @@ import xapian
def search(institution, qs, offset=0, limit=10):
database = xapian.Database("db")
try:
database = xapian.Database("db")
except (xapian.DatabaseNotFoundError, xapian.DatabaseOpeningError):
return
qp = xapian.QueryParser()
qp.set_database(database)
@ -31,12 +34,9 @@ def search(institution, qs, offset=0, limit=10):
def index(institution, uuid, snap):
uuid = 'uuid:"{}"'.format(uuid)
try:
matches = search(institution, uuid, limit=1)
if matches.size() > 0:
return
except (xapian.DatabaseNotFoundError, xapian.DatabaseOpeningError):
pass
matches = search(institution, uuid, limit=1)
if matches and matches.size() > 0:
return
database = xapian.WritableDatabase("db", xapian.DB_CREATE_OR_OPEN)
indexer = xapian.TermGenerator()

View file

@ -2,12 +2,17 @@ import json
import uuid
import hashlib
import datetime
import logging
from django.core.exceptions import ValidationError
from evidence.xapian import index
from evidence.models import Annotation
from device.models import Device
logger = logging.getLogger('django')
def create_doc(data):
if not data:
return
@ -76,6 +81,17 @@ def create_annotation(doc, user, commit=False):
'value': doc['CUSTOMER_ID'],
}
if commit:
annotation = Annotation.objects.filter(
uuid=doc["uuid"],
owner=user.institution,
type=Annotation.Type.SYSTEM,
)
if annotation:
txt = "Warning: Snapshot {} exist as annotation !!".format(doc["uuid"])
logger.exception(txt)
return annotation
return Annotation.objects.create(**data)
return Annotation(**data)