Merge branch 'main' into data-institution

This commit is contained in:
Cayo Puigdefabregas 2024-10-14 11:37:41 +02:00
commit 4e9df475c3
23 changed files with 735 additions and 280 deletions

2
.env.example Normal file
View File

@ -0,0 +1,2 @@
DOMAIN=localhost
DEMO=false

View File

@ -34,14 +34,14 @@ def NewSnapshot(request):
return JsonResponse({'error': 'Invalid request method'}, status=400)
# Authentication
# auth_header = request.headers.get('Authorization')
# if not auth_header or not auth_header.startswith('Bearer '):
# return JsonResponse({'error': 'Invalid or missing token'}, status=401)
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
# token = auth_header.split(' ')[1]
# tk = Token.objects.filter(token=token).first()
# if not tk:
# return JsonResponse({'error': 'Invalid or missing token'}, status=401)
token = auth_header.split(' ')[1]
tk = Token.objects.filter(token=token).first()
if not tk:
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
# Validation snapshot
try:
@ -65,9 +65,7 @@ def NewSnapshot(request):
# save_in_disk(data, tk.user)
try:
# Build(data, tk.user)
user = User.objects.get(email="user@example.org")
Build(data, user)
Build(data, tk.user)
except Exception:
return JsonResponse({'status': 'fail'}, status=200)

View File

@ -1,213 +0,0 @@
{% load i18n static %}
<!doctype html>
<html lang="en">
<head>
{% block head %}
{% block meta %}
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<meta name="robots" content="NONE,NOARCHIVE" />
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="">
<meta name="author" content="Pangea">
{% endblock %}
<title>{% block title %}{% if title %}{{ title }} {% endif %}DeviceHub{% endblock %}</title>
<!-- Bootstrap core CSS -->
{% block style %}
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.11.1/font/bootstrap-icons.css">
<link rel="stylesheet" href= "https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.4.2/css/all.min.css">
<link href="{% static "/css/bootstrap.min.css" %}" rel="stylesheet">
<style>
.bd-placeholder-img {
font-size: 1.125rem;
text-anchor: middle;
-webkit-user-select: none;
-moz-user-select: none;
user-select: none;
}
@media (min-width: 768px) {
.bd-placeholder-img-lg {
font-size: 3.5rem;
}
}
html, body {
height: 100%;
}
body {
display: flex;
flex-direction: column;
}
.main-content {
flex-grow: 1;
}
footer {
width: 100%;
}
</style>
<!-- Custom styles for this template -->
<link href="{% static "/css/dashboard.css" %}" rel="stylesheet">
{% endblock %}
{% endblock %}
</head>
<body>
<header class="navbar navbar-dark sticky-top admin bg-green flex-md-nowrap p-0 shadow">
<a class="navbar-brand col-md-3 col-lg-2 me-0 px-3" href="#">DEVICE HUB</a>
<div class="navbar-nav navbar-sub-brand">
PANGEA
</div>
<div class="navbar-nav">
<div class="nav-item text-nowrap">
<i id="user-avatar" class="bi bi-person-circle"></i>
<a class="navbar-sub-brand px-3" href="#">{{ user.email }}</a>
<a class="logout" href="{% url 'login:logout' %}">
<i class="fa-solid fa-arrow-right-from-bracket"></i>
</a>
</div>
</div>
</header>
<div class="container-fluid">
<div class="row">
<nav id="sidebarMenu" class="col-md-3 col-lg-2 d-md-block bg-light sidebar collapse">
<div class="position-sticky pt-5">
<ul class="nav flex-column">
<li class="nav-item">
<a class="admin nav-link {% if section == 'Home' %}active {% endif %}fw-bold" href="{% url 'dashboard:dashboard' %}">
<i class="bi bi-house-door icon_sidebar"></i>
{% trans 'Dashboard' %}
</a>
<hr />
</li>
<li class="nav-item">
<a class="admin {% if section == 'People' %}active {% endif %}nav-link fw-bold" data-bs-toggle="collapse" data-bs-target="#people" aria-expanded="false" aria-controls="people" href="javascript:void()">
<i class="bi bi-people icon_sidebar"></i>
{% trans 'Users' %}
</a>
<ul class="flex-column mb-2 ul_sidebar accordion-collapse {% if section == 'People' %}expanded{% else %}collapse{% endif %}" id="people" data-bs-parent="#sidebarMenu">
<li class="nav-item">
<a class="nav-link{% if path == 'admin_people_list' %} active2{% endif %}" href="{# url 'idhub:admin_people_list' #}">
{% trans 'View users' %}
</a>
</li>
<li class="nav-item">
<a class="nav-link{% if path == 'admin_people_new' %} active2{% endif %}" href="{# url 'idhub:admin_people_new' #}">
{% trans 'Add user' %}
</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="admin nav-link {% if section == 'AccessControl' %}active {% endif %}fw-bold" data-bs-toggle="collapse" data-bs-target="#control-access" aria-expanded="false" aria-controls="control-access" href="javascript:void()">
<i class="fa-solid fa-arrow-right-from-bracket icon_sidebar"></i>
{% trans 'Roles' %}
</a>
<ul class="flex-column mb-2 ul_sidebar accordion-collapse {% if section == 'AccessControl' %}expanded{% else %}collapse{% endif %}" id="control-access" data-bs-parent="#sidebarMenu">
<li class="nav-item">
<a class="nav-link{% if path == 'admin_roles' %} active2{% endif %}" href="{# url 'idhub:admin_roles' #}">
{% trans 'Manage roles' %}
</a>
</li>
<li class="nav-item">
<a class="nav-link{% if path == 'admin_services' %} active2{% endif %}" href="{# url 'idhub:admin_services' #}">
{% trans 'Manage services' %}
</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="admin nav-link {% if section == 'Credential' %}active {% endif %}fw-bold" data-bs-toggle="collapse" data-bs-target="#credential" aria-expanded="false" aria-controls="credential" href="javascript:void()">
<i class="bi bi-patch-check icon_sidebar"></i>
{% trans 'Credentials' %}
</a>
<ul class="flex-column mb-2 ul_sidebar accordion-collapse {% if section == 'Credential' %}expanded{% else %}collapse{% endif %}" id="credential" data-bs-parent="#sidebarMenu">
<li class="nav-item">
<a class="nav-link{% if path == 'admin_credentials' %} active2{% endif %}" href="{# url 'idhub:admin_credentials' #}">
{% trans 'View credentials' %}
</a>
</li>
<li class="nav-item">
<a id="wallet" class="nav-link" data-bs-toggle="collapse" data-bs-target="#lwallet" aria-expanded="false" aria-controls="lwallet" href="javascript:void()">
{% trans "Organization's wallet" %}
</a>
<ul class="flex-column mb-2 accordion-collapse {% if wallet %}expanded{% else %}collapse{% endif %}" id="lwallet" data-bs-parent="#wallet">
<li class="nav-item">
<a class="nav-link{% if path == 'admin_dids' %} active2{% endif %}" href="{# url 'idhub:admin_dids' #}">
{% trans 'Manage Identities (DIDs)' %}
</a>
</li>
<li class="nav-item">
<a class="nav-link{% if path == 'admin_wallet_credentials' %} active2{% endif %}" href="{# url 'idhub:admin_wallet_credentials' #}">
{% trans 'View org. credentials' %}
</a>
</li>
<li class="nav-item">
<a class="nav-link{% if path == 'admin_wallet_config_issue' %} active2{% endif %}" href="{# url 'idhub:admin_wallet_config_issue' #}">
{% trans 'Configure credential issuance' %}
</a>
</li>
</ul>
</li>
</ul>
</li>
<li class="nav-item">
<a class="admin nav-link {% if section == 'Templates' %}active {% endif %}fw-bold" href="{# url 'idhub:admin_schemas' #}">
<i class="bi bi-file-earmark-text icon_sidebar"></i>
{% trans 'Templates' %}
</a>
</li>
<li class="nav-item">
<a class="admin nav-link {% if section == 'ImportExport' %}active {% endif %}fw-bold" href="{# url 'idhub:admin_import' #}">
<i class="bi bi-arrow-down-square icon_sidebar"></i>
{% trans 'Data' %}
</a>
</li>
</ul>
</div>
</nav>
<main class="col-md-9 ms-sm-auto col-lg-10 px-md-4">
{% block messages %}
{% for message in messages %}
<div class="alert {{ message.tags|default:'info' }} alert-dismissible fade show mt-3" role="alert">
{{ message }}
<button type="button" class="btn-close" data-bs-dismiss="alert" aria-label="Close">
</button>
</div>
{% endfor %}
{% endblock messages %}
<div class="d-flex justify-content-between flex-wrap flex-md-nowrap align-items-center pt-3 pb-2 mb-3 border-bottom">
<h1 class="h2">{{ title }}</h1>
<div class="btn-toolbar mb-2 mb-md-0">
</div>
</div>
{% block content %}
{% endblock content %}
</main>
</div>
</div>
<!-- Footer -->
<footer class="footer text-center mt-auto py-3">
<div class="container">
<span class="text-muted">{{ commit_id }}</span>
</div>
</footer>
{% block script %}
<script src="{% static "js/jquery-3.3.1.slim.min.js" %}"></script>
<script src="{% static "js/popper.min.js" %}"></script>
<script src="{% static "js/bootstrap.min.js" %}"></script>
{% block extrascript %}{% endblock %}
{% endblock %}
</body>
</html>

View File

@ -35,7 +35,19 @@
<thead>
<tr>
<th scope="col" data-sortable="">
<a class="dataTable-sorter" href="#">Title</a>
select
</th>
<th scope="col" data-sortable="">
shortid
</th>
<th scope="col" data-sortable="">
type
</th>
<th scope="col" data-sortable="">
manufacturer
</th>
<th scope="col" data-sortable="">
model
</th>
</tr>
</thead>
@ -47,9 +59,18 @@
</td>
<td>
<a href="{% url 'device:details' dev.id %}">
{{ dev.type }} {{ dev.manufacturer }} {{ dev.model }}
{{ dev.shortid }}
</a>
</td>
<td>
{{ dev.type }}
</td>
<td>
{{ dev.manufacturer }}
</td>
<td>
{{ dev.model }}
</td>
</tr>
</tbody>
{% endfor %}

View File

@ -23,7 +23,7 @@ DEVICE_TYPES = [
class DeviceForm(forms.Form):
type = forms.ChoiceField(choices = DEVICE_TYPES, required=False)
amount = forms.IntegerField(required=False, initial=1)
customer_id = forms.CharField(required=False)
custom_id = forms.CharField(required=False)
name = forms.CharField(required=False)
value = forms.CharField(required=False)
@ -49,8 +49,8 @@ class BaseDeviceFormSet(forms.BaseFormSet):
row["amount"] = d["amount"]
if d.get("name"):
row[d["name"]] = d.get("value", '')
if d.get("customer_id"):
row['CUSTOMER_ID']= d["customer_id"]
if d.get("custom_id"):
row['CUSTOM_ID']= d["custom_id"]
doc = create_doc(row)
if not commit:

View File

@ -27,6 +27,7 @@ class Device:
# the id is the chid of the device
self.id = kwargs["id"]
self.pk = self.id
self.shortid = self.pk[:6]
self.algorithm = None
self.owner = None
self.annotations = []
@ -89,10 +90,10 @@ class Device:
def get_hids(self):
annotations = self.get_annotations()
self.hids = annotations.filter(
self.hids = list(set(annotations.filter(
type=Annotation.Type.SYSTEM,
key__in=ALGOS.keys(),
).values_list("value", flat=True)
).values_list("value", flat=True)))
def get_evidences(self):
if not self.uuids:
@ -102,8 +103,9 @@ class Device:
def get_last_evidence(self):
annotations = self.get_annotations()
if annotations:
annotation = annotations.first()
if not annotations.count():
return
annotation = annotations.first()
self.last_evidence = Evidence(annotation.uuid)
def last_uuid(self):
@ -174,6 +176,9 @@ class Device:
@property
def type(self):
if self.last_evidence.doc['type'] == "WebSnapshot":
return self.last_evidence.doc.get("device", {}).get("type", "")
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.get_chassis()
@ -184,3 +189,8 @@ class Device:
self.get_last_evidence()
return self.last_evidence.get_model()
@property
def components(self):
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.get_components()

View File

@ -4,7 +4,7 @@
{% block content %}
<div class="row">
<div class="col">
<h3>{{ object.id }}</h3>
<h3>{{ object.shortid }}</h3>
</div>
</div>
@ -173,16 +173,19 @@
<div class="tab-pane fade profile-overview" id="components">
<h5 class="card-title">Components last evidence</h5>
<div class="list-group col-6">
{% for c in object.last_evidence.doc.components %}
{% for c in object.components %}
<div class="list-group-item">
<div class="d-flex w-100 justify-content-between">
<h5 class="mb-1">{{ c.type }}</h5>
<small class="text-muted">{{ evidence.created }}</small>
</div>
<p class="mb-1">
{{ c.manufacturer }}<br />
{{ c.model }}<br />
{{ c.serialNumber }}<br />
{% for k, v in c.items %}
{% if k not in "actions,type" %}
{{ k }}: {{ v }}<br />
{% endif %}
{% endfor %}
<br />
</p>
<small class="text-muted">
</small>

View File

@ -40,15 +40,6 @@
{% endif %}
{{ form.management_form }}
<div class="container" id="formset-container">
<div class="row mb-2">
<div class="col"></div>
<div class="col-2 text-center">
<a href="javascript:void()" onclick="addForm(this);" type="button" class="btn btn-green-admin">
<i class="bi bi-plus"></i>
{% trans 'Add' %}
</a>
</div>
</div>
<div class="row mb-2">
<div class="col">
{% bootstrap_field form.0.type %}
@ -61,7 +52,18 @@
</div>
<div class="row mb-2">
<div class="col">
{% bootstrap_field form.0.customer_id %}
{% bootstrap_field form.0.custom_id %}
</div>
</div>
<div class="row mb-2">
<div class="col-10">
<span class="fw-bold">{% trans 'Component details' %}</span>
</div>
<div class="col-2 text-center">
<a href="javascript:void()" onclick="addForm(this);" type="button" class="btn btn-green-admin text-nowrap">
<i class="bi bi-plus"></i>
{% trans 'Add component' %}
</a>
</div>
</div>
{% for f in form %}

View File

@ -2,7 +2,7 @@ import json
from django.http import Http404
from django.urls import reverse_lazy
from django.shortcuts import get_object_or_404
from django.shortcuts import get_object_or_404, Http404
from django.utils.translation import gettext_lazy as _
from django.views.generic.edit import (
CreateView,
@ -21,7 +21,7 @@ class NewDeviceView(DashboardView, FormView):
template_name = "new_device.html"
title = _("New Device")
breadcrumb = "Device / New Device"
success_url = reverse_lazy('device:add')
success_url = reverse_lazy('dashboard:unassigned_devices')
form_class = DeviceFormSet
def form_valid(self, form):
@ -91,6 +91,8 @@ class DetailsView(DashboardView, TemplateView):
def get(self, request, *args, **kwargs):
self.pk = kwargs['pk']
self.object = Device(id=self.pk)
if not self.object.last_evidence:
raise Http404
if self.object.owner != self.request.user.institution:
raise Http403

View File

@ -27,10 +27,17 @@ BASE_DIR = Path(__file__).resolve().parent.parent
SECRET_KEY = "django-insecure-1p8rs@qf$$l^!vsbetagojw23kw@1ez(qi8^(s0t&#7!wyh!l3"
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
DEBUG = config('DEBUG', default=False, cast=bool)
ALLOWED_HOSTS = config('ALLOWED_HOSTS', default='[]', cast=Csv())
DOMAIN = config("DOMAIN")
assert DOMAIN not in [None, ''], "DOMAIN var is MANDATORY"
# this var is very important, we print it
print("DOMAIN: " + DOMAIN)
ALLOWED_HOSTS = config('ALLOWED_HOSTS', default=DOMAIN, cast=Csv())
assert DOMAIN in ALLOWED_HOSTS, "DOMAIN is not ALLOWED_HOST"
CSRF_TRUSTED_ORIGINS = config('CSRF_TRUSTED_ORIGINS', default=f'https://{DOMAIN}', cast=Csv())
# Application definition

View File

@ -5,7 +5,8 @@ services:
dockerfile: docker/devicehub-django.Dockerfile
environment:
- DEBUG=true
- ALLOWED_HOSTS=*
- DOMAIN=${DOMAIN:-localhost}
- DEMO=${DEMO:-n}
volumes:
- .:/opt/devicehub-django
ports:

View File

@ -9,11 +9,14 @@ set -u
set -x
main() {
if [ "${DETACH:-}" ]; then
detach_arg='-d'
fi
# remove old database
sudo rm -vf db/*
docker compose down
docker compose down -v
docker compose build
docker compose up
docker compose up ${detach_arg:-}
}
main "${@}"

View File

@ -1,4 +1,4 @@
FROM python:3.11.7-slim-bookworm
FROM python:3.11.10-slim-bookworm
# last line is dependencies for weasyprint (for generating pdfs in lafede pilot) https://doc.courtbouillon.org/weasyprint/stable/first_steps.html#debian-11
RUN apt update && \
@ -22,7 +22,8 @@ compile = no
no-cache-dir = True
END
RUN pip install --upgrade pip
# upgrade pip, which might fail on lxc, then remove the "corrupted file"
RUN python -m pip install --upgrade pip || (rm -rf /usr/local/lib/python3.11/site-packages/pip-*.dist-info && python -m pip install --upgrade pip)
COPY ./requirements.txt /opt/devicehub-django
RUN pip install -r requirements.txt

View File

@ -21,21 +21,28 @@ deploy() {
# inspired by https://medium.com/analytics-vidhya/django-with-docker-and-docker-compose-python-part-2-8415976470cc
echo "INFO detected NEW deployment"
./manage.py migrate
./manage.py add_institution example-org
INIT_ORG="${INIT_ORG:-example-org}"
INIT_USER="${INIT_USER:-user@example.org}"
INIT_PASSWD="${INIT_PASSWD:-1234}"
./manage.py add_institution "${INIT_ORG}"
# TODO: one error on add_user, and you don't add user anymore
./manage.py add_user example-org user@example.org 1234
./manage.py add_user "${INIT_ORG}" "${INIT_USER}" "${INIT_PASSWD}"
if [ "${DEMO:-}" ]; then
./manage.py up_snapshots example/snapshots/ "${INIT_USER}"
fi
fi
}
runserver() {
PORT="${PORT:-8000}"
if [ "${DEBUG:-}" = "true" ]; then
if [ "${DEBUG:-}" ]; then
./manage.py runserver 0.0.0.0:${PORT}
else
# TODO
#./manage.py collectstatic
true
if [ "${EXPERIMENTAL:-}" = "true" ]; then
if [ "${EXPERIMENTAL:-}" ]; then
# TODO
# reloading on source code changing is a debugging future, maybe better then use debug
# src https://stackoverflow.com/questions/12773763/gunicorn-autoreload-on-source-change/24893069#24893069

80
docs/es/modelo-datos.md Normal file
View File

@ -0,0 +1,80 @@
Modelo de datos *abstracto* de devicehub que ayuda a tener una idea de cómo funciona
Recordad que por ser este un proyecto de django, se puede obtener de forma automatizada un diagrama de datos con el comando `graph_models` (más adelante vemos de documentar mejor cómo generarlo)
```mermaid
erDiagram
%% los snapshots/placeholders son ficheros de FS inmutables, se insertan en xapian
%% y via su uuid se anotan
%% placeholders también se pueden firmar (como un spnashot, otra fuente)
EVIDENCE {
json obj "its uuid is the PK"
}
USER {
int id PK
string personal-data-etc
}
%% includes the relevant CHID with algorithm for the device build
EVIDENCE_ANNOTATION {
int id PK
uuid uuid "ref evidence (snapshot,placeholder)"
string key
string value
int type "0: sys_deviceid, 1: usr_deviceid, 2: user"
ts created
int owner FK
}
ALGORITHM {
string algorithm
}
%% todas las anotaciones que tienen CHID
%% y su key es un algoritmo de los que tenemos
%% un device es una evaluación
DEVICE {
string CHID
}
DEVICE_ANNOTATION {
string CHID FK
string key
string value
uuid uuid "from last snapshot"
}
LOT {
int id PK
string name
string code "id alt legacy"
string description
bool closed
int owner FK
ts created
ts updated
}
LOT_ANNOTATION {
string id FK
string key
string value
}
SNAPSHOT ||--|| EVIDENCE: "via workbench"
PLACEHOLDER ||--|| EVIDENCE: "via webform"
EVIDENCE ||--|{ EVIDENCE_ANNOTATION: "are interpreted"
USER ||--|{ EVIDENCE_ANNOTATION: "manually entered"
ALGORITHM ||--|{ EVIDENCE_ANNOTATION: "automatically entered"
EVIDENCE_ANNOTATION }|--|{ DEVICE: "aggregates"
DEVICE }|--|{ LOT: "aggregates"
DEVICE ||--|| DEVICE_ANNOTATION: "enriches data"
LOT ||--|| LOT_ANNOTATION: "enriches data"
```

View File

@ -5,6 +5,7 @@ from django.db import models
from utils.constants import STR_SM_SIZE, STR_EXTEND_SIZE, CHASSIS_DH
from evidence.xapian import search
from evidence.parse_details import ParseSnapshot
from user.models import User, Institution
@ -36,6 +37,8 @@ class Evidence:
self.created = None
self.dmi = None
self.annotations = []
self.components = []
self.default = "n/a"
self.get_owner()
self.get_time()
@ -68,7 +71,6 @@ class Evidence:
dmidecode_raw = self.doc["data"]["dmidecode"]
self.dmi = DMIParse(dmidecode_raw)
def get_time(self):
if not self.doc:
self.get_doc()
@ -77,16 +79,31 @@ class Evidence:
if not self.created:
self.created = self.annotations.last().created
def components(self):
return self.doc.get('components', [])
def get_components(self):
if self.doc.get("software") != "EreuseWorkbench":
return self.doc.get('components', [])
self.set_components()
return self.components
def get_manufacturer(self):
if self.doc.get("type") == "WebSnapshot":
kv = self.doc.get('kv', {})
if len(kv) < 1:
return ""
return list(self.doc.get('kv').values())[0]
if self.doc.get("software") != "EreuseWorkbench":
return self.doc['device']['manufacturer']
return self.dmi.manufacturer().strip()
def get_model(self):
if self.doc.get("type") == "WebSnapshot":
kv = self.doc.get('kv', {})
if len(kv) < 2:
return ""
return list(self.doc.get('kv').values())[1]
if self.doc.get("software") != "EreuseWorkbench":
return self.doc['device']['model']
@ -104,11 +121,13 @@ class Evidence:
return k
return ""
@classmethod
def get_all(cls, user):
return Annotation.objects.filter(
owner=user.institution,
type=Annotation.Type.SYSTEM,
).order_by("-created").values_list("uuid", flat=True).distinct()
def set_components(self):
snapshot = ParseSnapshot(self.doc).snapshot_json
self.components = snapshot['components']

View File

@ -5,13 +5,13 @@ import hashlib
from datetime import datetime
from dmidecode import DMIParse
from evidence.models import Annotation
from evidence.xapian import index
from evidence.models import Evidence, Annotation
from utils.constants import ALGOS, CHASSIS_DH
def get_network_cards(child, nets):
if child['id'] == 'network':
if child['id'] == 'network' and "PCI:" in child.get("businfo"):
nets.append(child)
if child.get('children'):
[get_network_cards(x, nets) for x in child['children']]
@ -19,8 +19,12 @@ def get_network_cards(child, nets):
def get_mac(lshw):
nets = []
try:
get_network_cards(json.loads(lshw), nets)
except Exception as ss:
print("WARNING!! {}".format(ss))
return
get_network_cards(json.loads(lshw), nets)
nets_sorted = sorted(nets, key=lambda x: x['businfo'])
# This funcion get the network card integrated in motherboard
# integrate = [x for x in nets if "pci@0000:00:" in x.get('businfo', '')]

493
evidence/parse_details.py Normal file
View File

@ -0,0 +1,493 @@
import json
import numpy as np
from datetime import datetime
from dmidecode import DMIParse
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
def get_lshw_child(child, nets, component):
if child.get('id') == component:
nets.append(child)
if child.get('children'):
[get_lshw_child(x, nets, component) for x in child['children']]
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.dmidecode_raw = snapshot["data"].get("dmidecode", "{}")
self.smart_raw = snapshot["data"].get("disks", [])
self.hwinfo_raw = snapshot["data"].get("hwinfo", "")
self.lshw_raw = snapshot["data"].get("lshw", {}) or {}
self.lscpi_raw = snapshot["data"].get("lspci", "")
self.device = {"actions": []}
self.components = []
self.monitors = []
self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw)
self.lshw = self.loads(self.lshw_raw)
self.hwinfo = self.parse_hwinfo()
self.set_computer()
self.get_hwinfo_monitors()
self.set_components()
self.snapshot_json = {
"type": "Snapshot",
"device": self.device,
"software": snapshot["software"],
"components": self.components,
"uuid": snapshot['uuid'],
"version": snapshot['version'],
"endTime": snapshot["timestamp"],
"elapsed": 1,
}
def set_computer(self):
self.device['manufacturer'] = self.dmi.manufacturer().strip()
self.device['model'] = self.dmi.model().strip()
self.device['serialNumber'] = self.dmi.serial_number()
self.device['type'] = self.get_type()
self.device['sku'] = self.get_sku()
self.device['version'] = self.get_version()
self.device['system_uuid'] = self.get_uuid()
self.device['family'] = self.get_family()
self.device['chassis'] = self.get_chassis_dh()
def set_components(self):
self.get_cpu()
self.get_ram()
self.get_mother_board()
self.get_graphic()
self.get_data_storage()
self.get_display()
self.get_sound_card()
self.get_networks()
def get_cpu(self):
for cpu in self.dmi.get('Processor'):
serial = cpu.get('Serial Number')
if serial == 'Not Specified' or not serial:
serial = cpu.get('ID').replace(' ', '')
self.components.append(
{
"actions": [],
"type": "Processor",
"speed": self.get_cpu_speed(cpu),
"cores": int(cpu.get('Core Count', 1)),
"model": cpu.get('Version'),
"threads": int(cpu.get('Thread Count', 1)),
"manufacturer": cpu.get('Manufacturer'),
"serialNumber": serial,
"brand": cpu.get('Family'),
"address": self.get_cpu_address(cpu),
"bogomips": self.get_bogomips(),
}
)
def get_ram(self):
for ram in self.dmi.get("Memory Device"):
if ram.get('size') == 'No Module Installed':
continue
if not ram.get("Speed"):
continue
self.components.append(
{
"actions": [],
"type": "RamModule",
"size": self.get_ram_size(ram),
"speed": self.get_ram_speed(ram),
"manufacturer": ram.get("Manufacturer", self.default),
"serialNumber": ram.get("Serial Number", self.default),
"interface": ram.get("Type", "DDR"),
"format": ram.get("Form Factor", "DIMM"),
"model": ram.get("Part Number", self.default),
}
)
def get_mother_board(self):
for moder_board in self.dmi.get("Baseboard"):
self.components.append(
{
"actions": [],
"type": "Motherboard",
"version": moder_board.get("Version"),
"serialNumber": moder_board.get("Serial Number", "").strip(),
"manufacturer": moder_board.get("Manufacturer", "").strip(),
"biosDate": self.get_bios_date(),
"ramMaxSize": self.get_max_ram_size(),
"ramSlots": len(self.dmi.get("Memory Device")),
"slots": self.get_ram_slots(),
"model": moder_board.get("Product Name", "").strip(),
"firewire": self.get_firmware_num(),
"pcmcia": self.get_pcmcia_num(),
"serial": self.get_serial_num(),
"usb": self.get_usb_num(),
}
)
def get_graphic(self):
displays = []
get_lshw_child(self.lshw, displays, 'display')
for c in displays:
if not c['configuration'].get('driver', None):
continue
self.components.append(
{
"actions": [],
"type": "GraphicCard",
"memory": self.get_memory_video(c),
"manufacturer": c.get("vendor", self.default),
"model": c.get("product", self.default),
"serialNumber": c.get("serial", self.default),
}
)
def get_memory_video(self, c):
# get info of lspci
# pci_id = c['businfo'].split('@')[1]
# lspci.get(pci_id) | grep size
# lspci -v -s 00:02.0
return None
def get_data_storage(self):
for sm in self.smart:
if sm.get('smartctl', {}).get('exit_status') == 1:
continue
model = sm.get('model_name')
manufacturer = None
if model and len(model.split(" ")) > 1:
mm = model.split(" ")
model = mm[-1]
manufacturer = " ".join(mm[:-1])
self.components.append(
{
"actions": self.sanitize(sm),
"type": self.get_data_storage_type(sm),
"model": model,
"manufacturer": manufacturer,
"serialNumber": sm.get('serial_number'),
"size": self.get_data_storage_size(sm),
"variant": sm.get("firmware_version"),
"interface": self.get_data_storage_interface(sm),
}
)
def sanitize(self, action):
return []
def get_bogomips(self):
if not self.hwinfo:
return self.default
bogomips = 0
for row in self.hwinfo:
for cel in row:
if 'BogoMips' in cel:
try:
bogomips += float(cel.split(":")[-1])
except:
pass
return bogomips
def get_networks(self):
networks = []
get_lshw_child(self.lshw, networks, 'network')
for c in networks:
capacity = c.get('capacity')
wireless = bool(c.get('configuration', {}).get('wireless', False))
self.components.append(
{
"actions": [],
"type": "NetworkAdapter",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
"speed": capacity,
"variant": c.get('version', 1),
"wireless": wireless or False,
"integrated": "PCI:0000:00" in c.get("businfo", ""),
}
)
def get_sound_card(self):
multimedias = []
get_lshw_child(self.lshw, multimedias, 'multimedia')
for c in multimedias:
self.components.append(
{
"actions": [],
"type": "SoundCard",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
}
)
def get_display(self): # noqa: C901
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED'
for c in self.monitors:
resolution_width, resolution_height = (None,) * 2
refresh, serial, model, manufacturer, size = (None,) * 5
year, week, production_date = (None,) * 3
for x in c:
if "Vendor: " in x:
manufacturer = x.split('Vendor: ')[-1].strip()
if "Model: " in x:
model = x.split('Model: ')[-1].strip()
if "Serial ID: " in x:
serial = x.split('Serial ID: ')[-1].strip()
if " Resolution: " in x:
rs = x.split(' Resolution: ')[-1].strip()
if 'x' in rs:
resolution_width, resolution_height = [
int(r) for r in rs.split('x')
]
if "Frequencies: " in x:
try:
refresh = int(float(x.split(',')[-1].strip()[:-3]))
except Exception:
pass
if 'Year of Manufacture' in x:
year = x.split(': ')[1]
if 'Week of Manufacture' in x:
week = x.split(': ')[1]
if "Size: " in x:
size = self.get_size_monitor(x)
technology = next((t for t in TECHS if t in c[0]), None)
if year and week:
d = '{} {} 0'.format(year, week)
production_date = datetime.strptime(d, '%Y %W %w').isoformat()
self.components.append(
{
"actions": [],
"type": "Display",
"model": model,
"manufacturer": manufacturer,
"serialNumber": serial,
'size': size,
'resolutionWidth': resolution_width,
'resolutionHeight': resolution_height,
"productionDate": production_date,
'technology': technology,
'refreshRate': refresh,
}
)
def get_hwinfo_monitors(self):
for c in self.hwinfo:
monitor = None
external = None
for x in c:
if 'Hardware Class: monitor' in x:
monitor = c
if 'Driver Info' in x:
external = c
if monitor and not external:
self.monitors.append(c)
def get_size_monitor(self, x):
i = 1 / 25.4
t = x.split('Size: ')[-1].strip()
tt = t.split('mm')
if not tt:
return 0
sizes = tt[0].strip()
if 'x' not in sizes:
return 0
w, h = [int(x) for x in sizes.split('x')]
return "{:.2f}".format(np.sqrt(w**2 + h**2) * i)
def get_cpu_address(self, cpu):
default = 64
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'processor':
return c.get('width', default)
return default
def get_usb_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "USB" in u.get("Port Type", "").upper()
]
)
def get_serial_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "SERIAL" in u.get("Port Type", "").upper()
]
)
def get_firmware_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "FIRMWARE" in u.get("Port Type", "").upper()
]
)
def get_pcmcia_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "PCMCIA" in u.get("Port Type", "").upper()
]
)
def get_bios_date(self):
return self.dmi.get("BIOS")[0].get("Release Date", self.default)
def get_firmware(self):
return self.dmi.get("BIOS")[0].get("Firmware Revision", '1')
def get_max_ram_size(self):
size = 0
for slot in self.dmi.get("Physical Memory Array"):
capacity = slot.get("Maximum Capacity", '0').split(" ")[0]
size += int(capacity)
return size
def get_ram_slots(self):
slots = 0
for x in self.dmi.get("Physical Memory Array"):
slots += int(x.get("Number Of Devices", 0))
return slots
def get_ram_size(self, ram):
memory = ram.get("Size", "0")
return memory
def get_ram_speed(self, ram):
size = ram.get("Speed", "0")
return size
def get_cpu_speed(self, cpu):
speed = cpu.get('Max Speed', "0")
return speed
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", self.default).strip()
def get_version(self):
return self.dmi.get("System")[0].get("Version", self.default).strip()
def get_uuid(self):
return self.dmi.get("System")[0].get("UUID", '').strip()
def get_family(self):
return self.dmi.get("System")[0].get("Family", '')
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
def get_type(self):
chassis_type = self.get_chassis()
return self.translation_to_devicehub(chassis_type)
def translation_to_devicehub(self, original_type):
lower_type = original_type.lower()
CHASSIS_TYPE = {
'Desktop': [
'desktop',
'low-profile',
'tower',
'docking',
'all-in-one',
'pizzabox',
'mini-tower',
'space-saving',
'lunchbox',
'mini',
'stick',
],
'Laptop': [
'portable',
'laptop',
'convertible',
'tablet',
'detachable',
'notebook',
'handheld',
'sub-notebook',
],
'Server': ['server'],
'Computer': ['_virtual'],
}
for k, v in CHASSIS_TYPE.items():
if lower_type in v:
return k
return self.default
def get_chassis_dh(self):
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_data_storage_type(self, x):
# TODO @cayop add more SSDS types
SSDS = ["nvme"]
SSD = 'SolidStateDrive'
HDD = 'HardDrive'
type_dev = x.get('device', {}).get('type')
trim = x.get('trim', {}).get("supported") in [True, "true"]
return SSD if type_dev in SSDS or trim else HDD
def get_data_storage_interface(self, x):
interface = x.get('device', {}).get('protocol', 'ATA')
if interface.upper() in DATASTORAGEINTERFACE:
return interface.upper()
txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
self.sid, interface
)
self.errors("{}".format(err))
def get_data_storage_size(self, x):
return x.get('user_capacity', {}).get('bytes')
def parse_hwinfo(self):
hw_blocks = self.hwinfo_raw.split("\n\n")
return [x.split("\n") for x in hw_blocks]
def loads(self, x):
if isinstance(x, str):
try:
return json.loads(x)
except Exception as ss:
print("WARNING!! {}".format(ss))
return {}
return x
def errors(self, txt=None):
if not txt:
return self._errors
logger.error(txt)
self._errors.append(txt)

View File

@ -11,6 +11,7 @@ from django.views.generic.edit import (
FormView,
)
from dashboard.mixins import DashboardView, Http403
from evidence.models import Evidence, Annotation
from evidence.forms import UploadForm, UserTagForm, ImportForm

View File

@ -10,3 +10,4 @@ pandas==2.2.2
xlrd==2.0.1
odfpy==1.4.1
pytz==2024.2

View File

@ -1,7 +1,9 @@
from uuid import uuid4
from django.core.management.base import BaseCommand
from django.contrib.auth import get_user_model
from user.models import Institution
from lot.models import LotTag
from api.models import Token
User = get_user_model()
@ -32,3 +34,6 @@ class Command(BaseCommand):
)
self.u.set_password(password)
self.u.save()
token = uuid4()
Token.objects.create(token=token, owner=self.u)
print(f"TOKEN: {token}")

View File

@ -1,5 +1,5 @@
from decouple import config
from django.urls import reverse
from django.urls import reverse, reverse_lazy
from django.http import HttpResponse
from django.shortcuts import render
from django.utils.translation import gettext_lazy as _

View File

@ -38,3 +38,11 @@ CHASSIS_DH = {
'Tablet': {'tablet'},
'Virtual': {'_virtual'},
}
DATASTORAGEINTERFACE = [
'ATA',
'USB',
'PCI',
'NVME',
]