Compare commits

..

4 Commits

30 changed files with 510 additions and 498 deletions

View File

@ -1,16 +1,16 @@
DOMAIN=localhost
DEMO=true
# note that with DEBUG=true, logs are more verbose (include tracebacks)
DEBUG=true
ALLOWED_HOSTS=localhost,localhost:8000,127.0.0.1,
DEMO=false
STATIC_ROOT=/tmp/static/
MEDIA_ROOT=/tmp/media/
ALLOWED_HOSTS=localhost,localhost:8000,127.0.0.1,
DOMAIN=localhost
DEBUG=True
EMAIL_HOST="mail.example.org"
EMAIL_HOST_USER="fillme_noreply"
EMAIL_HOST_PASSWORD="fillme_passwd"
EMAIL_PORT=587
EMAIL_USE_TLS=true
EMAIL_USE_TLS=True
EMAIL_BACKEND="django.core.mail.backends.smtp.EmailBackend"
EMAIL_FILE_PATH="/tmp/app-messages"
ENABLE_EMAIL=false

121
README.md
View File

@ -1,123 +1,20 @@
# Device Hub
# INSTALACIÓN:
DeviceHub is an IT Asset Management System focused on reusing devices, created under the [eReuse.org](https://www.ereuse.org) project.
La instalación es muy estándar
## Overview
DeviceHub aims to:
- Provide a common IT Asset Management platform for donors, receivers, and IT professionals.
- Automatically collect, analyze, and share device metadata while ensuring privacy and traceability.
- Integrate with existing IT Asset Management Systems.
- Operate in a decentralized manner.
DeviceHub primarily works with three types of objects:
1. **Devices**: Including computers, smartphones, and their components.
2. **Events**: Actions performed on devices (e.g., Repair, Allocate).
3. **Accounts**: Users who perform events on devices.
## Installation
Assuming a host with debian stable
### Quickstart
For a quick start with dummy data in localhost, DeviceHub can be run directly with docker. To do so, from the root of the project run:
```bash
./docker-reset.sh
```
Note that everytime you perform the `docker-reset.sh` script, all data is lost.
Also there is a demo running in http://demo.ereuse.org/. The token for accessing the instance will be always: `token=5018dd65-9abd-4a62-8896-80f34ac66150`, but the instance will be reset every day at 4 am.
For production needs, review and change .env file properly
## Running from baremetal
### Prerequisites
- Python 3.10
- pip
- virtualenv
Specially when developing, is quite convenient to run DeviceHub from a virtual environment. To start with this deployment, create a virtual environment to isolate our project dependencies:
```bash
python -m venv env
source env/bin/activate
pip install -r requirements.txt
source env/bin/actevate
python install -r requirements.txt
```
### System Dependencies
## IMPORTANT EXTERNAL DEPENDENCIES
#### Xapian
Para arrancarlo es necesario tener el paquete `xapian-bindings` en tu ordenador. No se instala mediante `pip`, así que depende de cada [sistema operativo](https://xapian.org/download).
Now, install the xapian dependencies (xapian library and python bindings)
Luego solo necesitas:
```bash
sudo apt-get install python3-xapian libxapian-dev
```
Allow the virtual environment to use system-installed packages:
```bash
export PYTHONPATH="${PYTHONPATH}:/usr/lib/python3/dist-packages"
./manage.py migrate
./manage.py runserver
```
#### Environment Variables
Now, configure the environment variables. For this, we will expand a `.env` file. You can use the following content as an example:
```source
STATIC_ROOT=/tmp/static/
MEDIA_ROOT=/tmp/media/
ALLOWED_HOSTS=localhost,localhost:8000,127.0.0.1,
DOMAIN=localhost
DEBUG=True
```
Now, expand the environment variables:
```bash
source .env
```
### Migrations
Now, apply migrations
```bash
python manage.py makemigrations
python manage.py migrate
```
Also, we can add some dummy data into the database to play along:
```bash
python manage.py add_institution Pangea
python manage.py add_user Pangea user@example.org 1234
python manage.py up_snapshots example/snapshots/ user@example.org
```
### Run DeviceHub
Finally, we can run the DeviceHub service by running:
```bash
python manage.py runserver
```
### Clean up
To clean up the deployment and start fresh, just delete Django's database:
```bash
rm db/*
```
## License
DeviceHub is released under the [GNU Affero General Public License v3.0](LICENSE).

41
admin/forms.py Normal file
View File

@ -0,0 +1,41 @@
from django import forms
from utils.device import create_annotation, create_doc, create_index
from utils.save_snapshots import move_json, save_in_disk
from django.forms.formsets import BaseFormSet
from django.forms import formset_factory
from django.core.exceptions import ValidationError
class CustomStatusLabelForm(forms.Form):
label_name = forms.CharField(
label="Annotation Name",
max_length=50,
widget=forms.TextInput(attrs={'class': 'form-control'})
)
class CustomStatusValueForm(forms.Form):
label_state = forms.CharField(
label="Possible State",
max_length=50,
widget=forms.TextInput(attrs={'class': 'form-control'})
)
class CustomStatusValueFormSet(BaseFormSet):
"""Validation for inputs (no two values should be the same)"""
def clean(self):
if any(self.errors):
return
label = []
labels = []
for form in self.forms:
label = form.cleaned_data.get('label_state')
if label:
if label in labels:
raise ValidationError("Duplicate labels are not allowed.")
labels.append(label)
CustomStatusFormSet = formset_factory(CustomStatusValueForm ,formset = CustomStatusValueFormSet)

View File

@ -10,9 +10,15 @@
<div class="row">
<div class="col">
<a href="{% url 'admin:institution' user.institution.pk %}" class="btn btn-green-admin">
{% translate "Institution" %}
</a>
<a href="{% url 'admin:reserved'%}" class="btn btn-green-admin">
{% translate "Reserved Annotations" %}
</a>
</div>
</div>

View File

@ -0,0 +1,130 @@
{% extends "base.html" %}
{% load i18n %}
{% load django_bootstrap5 %}
{% block content %}
<div class="container mt-5">
<div class="row mb-4">
<div class="col">
<h3 class="text-center">{{ subtitle }}</h3>
</div>
</div>
<form role="form" method="post" novalidate>
{% csrf_token %}
<div class="mb-3">
<h5 class="mt-4">{% translate "Status name" %}</h5>
{% bootstrap_field form.label_name show_label=False %}
</div>
<h5 class="mt-4">{% translate "Possible States" %}</h5>
<div id="formset">
{{ formset.management_form }}
{% for form in formset %}
<div class="row mb-3 formset-form">
<div class="col-md-10">
{% bootstrap_field form.label_state show_label=False %}
</div>
<div class="col-md-2 d-flex align-items-center">
{% if forloop.first %}
<button type="button" class="btn btn-sm btn-success add-form">
&#43;
</button>
{% endif %}
<button type="button" class="btn btn-sm btn-danger remove-form ms-2">
&minus;
</button>
</div>
</div>
{% endfor %}
</div>
<div class="container">
<a class="btn btn-grey" href="{% url 'admin:panel' %}">{% translate "Cancel" %}</a>
<input class="btn btn-green-admin" type="submit" name="submit" value="{% translate 'Save' %}" />
</div>
</form>
</div>
<script>
//TODO: change this to jquery formset plugin
document.addEventListener('DOMContentLoaded', function() {
var formsetDiv = document.getElementById('formset');
var totalForms = document.getElementById('id_form-TOTAL_FORMS');
function updateElementIndex(el, prefix, index) {
var idRegex = new RegExp('(' + prefix + '-\\d+-)');
var replacement = prefix + '-' + index + '-';
if (el.id) el.id = el.id.replace(idRegex, replacement);
if (el.name) el.name = el.name.replace(idRegex, replacement);
}
function addForm(e) {
e.preventDefault();
var formCount = parseInt(totalForms.value);
var newForm = document.querySelector('.formset-form').cloneNode(true);
newForm.querySelectorAll('input').forEach(function(input) {
updateElementIndex(input, 'form', formCount);
input.value = '';
});
// Remove any hidden inputs (management form fields)
newForm.querySelectorAll('input[type="hidden"]').forEach(function(hiddenInput) {
hiddenInput.remove();
});
// Attach event listeners to the new buttons
var addButton = newForm.querySelector('.add-form');
var removeButton = newForm.querySelector('.remove-form');
addButton.addEventListener('click', addForm);
removeButton.addEventListener('click', removeForm);
// Ensure only the first form has the add button
formsetDiv.querySelectorAll('.add-form').forEach(function(button, index) {
if (index > 0) {
button.remove();
}
});
formsetDiv.appendChild(newForm);
totalForms.value = formCount + 1;
}
function removeForm(e) {
e.preventDefault();
var formToRemove = e.target.closest('.formset-form');
formToRemove.remove();
var forms = formsetDiv.querySelectorAll('.formset-form');
totalForms.value = forms.length;
// Re-index form elements
forms.forEach(function(form, index) {
form.querySelectorAll('input').forEach(function(input) {
updateElementIndex(input, 'form', index);
});
});
// Ensure only the first form has the add button
formsetDiv.querySelectorAll('.add-form').forEach(function(button, index) {
if (index > 0) {
button.remove();
}
});
}
// Initial event listeners
document.querySelectorAll('.add-form').forEach(function(button) {
button.addEventListener('click', addForm);
});
document.querySelectorAll('.remove-form').forEach(function(button) {
button.addEventListener('click', removeForm);
});
});
</script>
{% endblock %}

View File

@ -10,4 +10,5 @@ urlpatterns = [
path("users/edit/<int:pk>", views.EditUserView.as_view(), name="edit_user"),
path("users/delete/<int:pk>", views.DeleteUserView.as_view(), name="delete_user"),
path("institution/<int:pk>", views.InstitutionView.as_view(), name="institution"),
path("reserved", views.AddReservedAnnotationView.as_view(), name="reserved"),
]

View File

@ -8,9 +8,16 @@ from django.views.generic.edit import (
UpdateView,
DeleteView,
)
from django.views.generic import FormView
import logging
from dashboard.mixins import DashboardView, Http403
from user.models import User, Institution
from admin.email import NotifyActivateUserByEmail
from admin.forms import CustomStatusLabelForm, CustomStatusValueForm, CustomStatusFormSet
from evidence.models import Annotation, AllowedValue
import uuid
logger = logging.getLogger('dhub')
class AdminView(DashboardView):
@ -124,3 +131,70 @@ class InstitutionView(AdminView, UpdateView):
self.object = self.request.user.institution
kwargs = super().get_form_kwargs()
return kwargs
class AddReservedAnnotationView(AdminView, FormView):
template_name = "reserved.html"
title = _("New Custom State Labels")
breadcrumb = "Admin / Custom State Labels (new name?)"
success_url = reverse_lazy('admin:panel')
form_class = CustomStatusLabelForm
formset_class = CustomStatusFormSet
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
if self.request.POST:
context['form'] = self.form_class(self.request.POST)
context['formset'] = self.formset_class(self.request.POST)
else:
context['form'] = self.form_class()
context['formset'] = self.formset_class()
context['subtitle'] = _("Add Custom Status Label")
return context
def form_valid(self, form):
context = self.get_context_data()
formset = context['formset']
form = context['form']
if form.is_valid():
label_name = form.cleaned_data['label_name']
else:
return self.form_invalid(form)
if formset.is_valid():
annotation = Annotation.objects.create(
uuid=uuid.uuid4(),
owner=self.request.user.institution,
user=self.request.user,
type=Annotation.Type.ADMIN,
key=label_name,
value=label_name
)
first_state = None
for form in formset:
state = form.cleaned_data.get('label_state')
if state and not first_state:
first_state = state
annotation.value = state
annotation.save()
if state:
AllowedValue.objects.create(
annotation=annotation,
value=state
)
logger.info("Saving custom label to db: " + label_name)
self.success_message = _("Custom status label has been added.")
self.success_url = reverse_lazy('admin:panel')
return super().form_valid(form)
else:
logger.error("Formset is not valid")
logger.error(formset.errors)
return self.form_invalid(form)

View File

@ -5,7 +5,6 @@ import logging
from uuid import uuid4
from django.urls import reverse_lazy
from django.conf import settings
from django.http import JsonResponse
from django.shortcuts import get_object_or_404, redirect
from django.utils.translation import gettext_lazy as _
@ -42,20 +41,20 @@ class ApiMixing(View):
# Authentication
auth_header = self.request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
logger.error("Invalid or missing token %s", auth_header)
logger.exception("Invalid or missing token {}".format(auth_header))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
token = auth_header.split(' ')[1].strip("'").strip('"')
try:
uuid.UUID(token)
except Exception:
logger.error("Invalid or missing token %s", token)
logger.exception("Invalid token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
self.tk = Token.objects.filter(token=token).first()
if not self.tk:
logger.error("Invalid or missing token %s", token)
logger.exception("Invalid or missing token {}".format(token))
return JsonResponse({'error': 'Invalid or missing token'}, status=401)
@ -73,8 +72,7 @@ class NewSnapshotView(ApiMixing):
try:
data = json.loads(request.body)
except json.JSONDecodeError:
txt = "error: the snapshot is not a json"
logger.error("%s", txt)
logger.exception("Invalid Snapshot of user {}".format(self.tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
# Process snapshot
@ -87,7 +85,7 @@ class NewSnapshotView(ApiMixing):
if not data.get("uuid"):
txt = "error: the snapshot not have uuid"
logger.error("%s", txt)
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
exist_annotation = Annotation.objects.filter(
@ -96,20 +94,15 @@ class NewSnapshotView(ApiMixing):
if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid'])
logger.warning("%s", txt)
logger.exception(txt)
return JsonResponse({'status': txt}, status=500)
try:
Build(data, self.tk.owner)
except Exception as err:
if settings.DEBUG:
logger.exception("%s", err)
snapshot_id = data.get("uuid", "")
txt = "It is not possible to parse snapshot: %s."
logger.error(txt, snapshot_id)
text = "fail: It is not possible to parse snapshot"
return JsonResponse({'status': text}, status=500)
logger.exception(err)
return JsonResponse({'status': f"fail: {err}"}, status=500)
annotation = Annotation.objects.filter(
uuid=data['uuid'],
@ -121,7 +114,7 @@ class NewSnapshotView(ApiMixing):
if not annotation:
logger.error("Error: No annotation for uuid: %s", data["uuid"])
logger.exception("Error: No annotation for uuid: {}".format(data["uuid"]))
return JsonResponse({'status': 'fail'}, status=500)
url_args = reverse_lazy("device:details", args=(annotation.value,))
@ -293,7 +286,7 @@ class AddAnnotationView(ApiMixing):
key = data["key"]
value = data["value"]
except Exception:
logger.error("Invalid Snapshot of user %s", self.tk.owner)
logger.exception("Invalid Snapshot of user {}".format(self.tk.owner))
return JsonResponse({'error': 'Invalid JSON'}, status=500)
Annotation.objects.create(

View File

@ -29,7 +29,7 @@ class Device:
self.shortid = self.pk[:6].upper()
self.algorithm = None
self.owner = None
self.annotations = []
self.annotations = []
self.hids = []
self.uuids = []
self.evidences = []
@ -108,7 +108,7 @@ class Device:
return
annotation = annotations.first()
self.last_evidence = Evidence(annotation.uuid)
def is_eraseserver(self):
if not self.uuids:
self.get_uuids()
@ -120,7 +120,7 @@ class Device:
owner=self.owner,
type=Annotation.Type.ERASE_SERVER
).first()
if annotation:
return True
return False
@ -129,8 +129,7 @@ class Device:
return self.uuids[0]
def get_lots(self):
self.lots = [
x.lot for x in DeviceLot.objects.filter(device_id=self.id)]
self.lots = [x.lot for x in DeviceLot.objects.filter(device_id=self.id)]
@classmethod
def get_unassigned(cls, institution, offset=0, limit=None):
@ -180,6 +179,7 @@ class Device:
count = cls.get_unassigned_count(institution)
return devices, count
@classmethod
def get_unassigned_count(cls, institution):
@ -279,12 +279,6 @@ class Device:
self.get_last_evidence()
return self.last_evidence.get_manufacturer()
@property
def serial_number(self):
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.get_serial_number()
@property
def type(self):
if self.last_evidence.doc['type'] == "WebSnapshot":

View File

@ -58,7 +58,7 @@
<div class="col-lg-9 col-md-8">{{ object.type }}</div>
</div>
{% if object.is_websnapshot and object.last_user_evidence %}
{% if object.is_websnapshot %}
{% for k, v in object.last_user_evidence %}
<div class="row mb-3">
<div class="col-lg-3 col-md-4 label">{{ k }}</div>
@ -84,7 +84,7 @@
<div class="col-lg-3 col-md-4 label">
{% trans 'Serial Number' %}
</div>
<div class="col-lg-9 col-md-8">{{ object.serial_number|default:'' }}</div>
<div class="col-lg-9 col-md-8">{{ object.last_evidence.doc.device.serialNumber|default:'' }}</div>
</div>
{% endif %}

View File

@ -120,12 +120,10 @@
<div class="col-md-4 info-label">Model</div>
<div class="col-md-8 info-value">{{ object.model|default:'' }}</div>
</div>
{% if user.is_authenticated %}
<div class="info-row row">
<div class="col-md-4 info-label">Serial Number</div>
<div class="col-md-8 info-value">{{ object.serial_number|default:'' }}</div>
</div>
{% endif %}
<div class="info-row row">
<div class="col-md-4 info-label">Serial Number</div>
<div class="col-md-8 info-value">{{ object.last_evidence.doc.device.serialNumber|default:'' }}</div>
</div>
{% endif %}
</div>
@ -138,6 +136,7 @@
{% endfor %}
</div>
</div>
<h2 class="section-title mt-5">Components</h2>
<div class="row">
{% for component in object.components %}
@ -148,9 +147,7 @@
<p class="card-text">
{% for component_key, component_value in component.items %}
{% if component_key not in 'actions,type' %}
{% if component_key != 'serialNumber' or user.is_authenticated %}
<strong>{{ component_key }}:</strong> {{ component_value }}<br />
{% endif %}
<strong>{{ component_key }}:</strong> {{ component_value }}<br />
{% endif %}
{% endfor %}
</p>
@ -160,9 +157,10 @@
{% endfor %}
</div>
</div>
<footer>
<p>
&copy;{% now 'Y' %}eReuse. All rights reserved.
&copy;{% now 'Y' %} eReuse. All rights reserved.
</p>
</footer>

View File

@ -3,47 +3,74 @@ from unittest.mock import MagicMock
class TestDevice(Device):
def __init__(self, id):
super().__init__(id=id)
self.shortid = id[:6].upper()
self.uuids = []
self.hids = ['hid1', 'hid2']
self._setup_evidence()
"""A test subclass of Device that overrides the database-dependent methods"""
# TODO Leaving commented bc not used, but might be useful at some point
# def get_annotations(self):
# """Return empty list instead of querying database"""
# return []
def _setup_evidence(self):
self._evidence = MagicMock()
self._evidence.doc = {
'type': 'Computer',
'manufacturer': 'Test Manufacturer',
'model': 'Test Model',
'device': {
'serialNumber': 'SN123456',
'type': 'Computer'
# def get_uuids(self):
# """Set uuids directly instead of querying"""
# self.uuids = ['uuid1', 'uuid2']
# def get_hids(self):
# """Set hids directly instead of querying"""
# self.hids = ['hid1', 'hid2']
# def get_evidences(self):
# """Set evidences directly instead of querying"""
# self.evidences = []
# def get_lots(self):
# """Set lots directly instead of querying"""
# self.lots = []
def get_last_evidence(self):
if not hasattr(self, '_evidence'):
self._evidence = MagicMock()
self._evidence.doc = {
'type': 'Computer',
'manufacturer': 'Test Manufacturer',
'model': 'Test Model',
'device': {
'serialNumber': 'SN123456',
'type': 'Computer'
}
}
}
self._evidence.get_manufacturer = lambda: 'Test Manufacturer'
self._evidence.get_model = lambda: 'Test Model'
self._evidence.get_chassis = lambda: 'Computer'
self._evidence.get_components = lambda: [
{
'type': 'CPU',
'model': 'Intel i7',
'manufacturer': 'Intel',
'serialNumber': 'SN12345678'
},
{
'type': 'RAM',
'size': '8GB',
'manufacturer': 'Kingston',
'serialNumber': 'SN87654321'
}
]
self._evidence.get_manufacturer = lambda: 'Test Manufacturer'
self._evidence.get_model = lambda: 'Test Model'
self._evidence.get_chassis = lambda: 'Computer'
self._evidence.get_components = lambda: [
{
'type': 'CPU',
'model': 'Intel i7',
'manufacturer': 'Intel'
},
{
'type': 'RAM',
'size': '8GB',
'manufacturer': 'Kingston'
}
]
self.last_evidence = self._evidence
@property
def components(self):
return self.last_evidence.get_components()
@property
def serial_number(self):
return self.last_evidence.doc['device']['serialNumber']
class TestWebSnapshotDevice(TestDevice):
"""A test subclass of Device that simulates a WebSnapshot device"""
def get_last_evidence(self):
if not hasattr(self, '_evidence'):
self._evidence = MagicMock()
self._evidence.doc = {
'type': 'WebSnapshot',
'kv': {
'URL': 'http://example.com',
'Title': 'Test Page',
'Timestamp': '2024-01-01'
},
'device': {
'type': 'Laptop'
}
}
self.last_evidence = self._evidence
return self._evidence

View File

@ -2,8 +2,7 @@ from django.test import TestCase, Client
from django.urls import reverse
from unittest.mock import patch
from device.views import PublicDeviceWebView
from device.tests.test_mock_device import TestDevice
from user.models import User, Institution
from device.tests.test_mock_device import TestDevice, TestWebSnapshotDevice
class PublicDeviceWebViewTests(TestCase):
@ -12,99 +11,57 @@ class PublicDeviceWebViewTests(TestCase):
self.test_id = "test123"
self.test_url = reverse('device:device_web',
kwargs={'pk': self.test_id})
self.institution = Institution.objects.create(
name="Test Institution"
)
self.user = User.objects.create_user(
email='test@example.com',
institution=self.institution,
password='testpass123'
)
def test_url_resolves_correctly(self):
"""Test that the URL is constructed correctly"""
url = reverse('device:device_web', kwargs={'pk': self.test_id})
self.assertEqual(url, f'/device/{self.test_id}/public/')
@patch('device.views.Device')
def test_html_response_anonymous(self, MockDevice):
def test_html_response(self, MockDevice):
test_device = TestDevice(id=self.test_id)
MockDevice.return_value = test_device
response = self.client.get(self.test_url)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'device_web.html')
self.assertContains(response, 'Test Manufacturer')
self.assertContains(response, 'Test Model')
self.assertContains(response, 'Computer')
self.assertContains(response, self.test_id)
self.assertNotContains(response, 'Serial Number')
self.assertNotContains(response, 'serialNumber')
@patch('device.views.Device')
def test_html_response_authenticated(self, MockDevice):
test_device = TestDevice(id=self.test_id)
MockDevice.return_value = test_device
self.client.login(username='test@example.com', password='testpass123')
response = self.client.get(self.test_url)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'device_web.html')
self.assertContains(response, 'Test Manufacturer')
self.assertContains(response, 'Test Model')
self.assertContains(response, 'Computer')
self.assertContains(response, self.test_id)
self.assertContains(response, 'Serial Number')
self.assertContains(response, 'Components')
self.assertContains(response, 'CPU')
self.assertContains(response, 'Intel')
self.assertContains(response, 'RAM')
self.assertContains(response, 'Kingston')
@patch('device.views.Device')
def test_json_response_anonymous(self, MockDevice):
def test_json_response(self, MockDevice):
test_device = TestDevice(id=self.test_id)
MockDevice.return_value = test_device
response = self.client.get(
self.test_url,
HTTP_ACCEPT='application/json'
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['Content-Type'], 'application/json')
json_data = response.json()
self.assertEqual(json_data['id'], self.test_id)
self.assertEqual(json_data['shortid'], self.test_id[:6].upper())
self.assertEqual(json_data['uuids'], [])
self.assertEqual(json_data['hids'], ['hid1', 'hid2'])
self.assertNotIn('serial_number', json_data)
self.assertNotIn('serialNumber', json_data)
self.assertEqual(json_data['components'], test_device.components)
@patch('device.views.Device')
def test_json_response_authenticated(self, MockDevice):
test_device = TestDevice(id=self.test_id)
def test_websnapshot_device(self, MockDevice):
test_device = TestWebSnapshotDevice(id=self.test_id)
MockDevice.return_value = test_device
self.client.login(username='test@example.com', password='testpass123')
response = self.client.get(
self.test_url,
HTTP_ACCEPT='application/json'
)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['Content-Type'], 'application/json')
response = self.client.get(self.test_url)
json_data = response.json()
self.assertEqual(json_data['id'], self.test_id)
self.assertEqual(json_data['shortid'], self.test_id[:6].upper())
self.assertEqual(json_data['components'], [
{
'type': 'CPU',
'model': 'Intel i7',
'manufacturer': 'Intel',
'serialNumber': 'SN12345678'
},
{
'type': 'RAM',
'size': '8GB',
'manufacturer': 'Kingston',
'serialNumber': 'SN87654321'
}
])
self.assertEqual(json_data['serial_number'], 'SN123456')
self.assertEqual(json_data['uuids'], [])
self.assertEqual(json_data['hids'], ['hid1', 'hid2'])
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'device_web.html')
self.assertContains(response, 'http://example.com')
self.assertContains(response, 'Test Page')

View File

@ -117,10 +117,10 @@ class PublicDeviceWebView(TemplateView):
def get(self, request, *args, **kwargs):
self.pk = kwargs['pk']
self.object = Device(id=self.pk)
if not self.object.last_evidence:
raise Http404
if self.request.headers.get('Accept') == 'application/json':
return self.get_json_response()
return super().get(request, *args, **kwargs)
@ -133,38 +133,15 @@ class PublicDeviceWebView(TemplateView):
})
return context
@property
def public_fields(self):
return {
def get_json_response(self):
data = {
'id': self.object.id,
'shortid': self.object.shortid,
'uuids': self.object.uuids,
'hids': self.object.hids,
'components': self.remove_serial_number_from(self.object.components),
'components': self.object.components
}
@property
def authenticated_fields(self):
return {
'serial_number': self.object.serial_number,
'components': self.object.components,
}
def remove_serial_number_from(self, components):
for component in components:
if 'serial_number' in component:
del component['SerialNumber']
return components
def get_device_data(self):
data = self.public_fields
if self.request.user.is_authenticated:
data.update(self.authenticated_fields)
return data
def get_json_response(self):
device_data = self.get_device_data()
return JsonResponse(device_data)
return JsonResponse(data)
class AddAnnotationView(DashboardView, CreateView):

View File

@ -17,8 +17,6 @@ from pathlib import Path
from django.contrib.messages import constants as messages
from decouple import config, Csv
from utils.logger import CustomFormatter
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
@ -34,6 +32,8 @@ DEBUG = config('DEBUG', default=False, cast=bool)
DOMAIN = config("DOMAIN")
assert DOMAIN not in [None, ''], "DOMAIN var is MANDATORY"
# this var is very important, we print it
print("DOMAIN: " + DOMAIN)
ALLOWED_HOSTS = config('ALLOWED_HOSTS', default=DOMAIN, cast=Csv())
assert DOMAIN in ALLOWED_HOSTS, f"DOMAIN {DOMAIN} is not in ALLOWED_HOSTS {ALLOWED_HOSTS}"
@ -205,34 +205,12 @@ LOGOUT_REDIRECT_URL = '/'
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
'formatters': {
'colored': {
'()': CustomFormatter,
'format': '%(levelname)s %(asctime)s %(message)s'
},
},
"handlers": {
"console": {
"level": "DEBUG",
"class": "logging.StreamHandler",
"formatter": "colored"
},
"console": {"level": "DEBUG", "class": "logging.StreamHandler"},
},
"root": {
"handlers": ["console"],
"level": "DEBUG",
},
"loggers": {
"django": {
"handlers": ["console"],
"level": "INFO",
"propagate": False, # Asegura que no se reenvíen a los manejadores raíz
},
"django.request": {
"handlers": ["console"],
"level": "ERROR",
"propagate": False,
}
}
}

View File

@ -4,7 +4,7 @@ services:
build:
dockerfile: docker/devicehub-django.Dockerfile
environment:
- DEBUG=${DEBUG:-false}
- DEBUG=true
- DOMAIN=${DOMAIN:-localhost}
- ALLOWED_HOSTS=${ALLOWED_HOSTS:-$DOMAIN}
- DEMO=${DEMO:-false}

View File

@ -14,11 +14,6 @@ main() {
if [ "${DETACH:-}" ]; then
detach_arg='-d'
fi
if [ ! -f .env ]; then
cp -v .env.example .env
echo "WARNING: .env was not there, .env.example was copied, this only happens once"
fi
# remove old database
sudo rm -vfr ./db/*
docker compose down -v

View File

@ -18,8 +18,6 @@ deploy() {
if [ "${DEBUG:-}" = 'true' ]; then
./manage.py print_settings
else
echo "DOMAIN: ${DOMAIN}"
fi
# detect if existing deployment (TODO only works with sqlite)

View File

@ -15,7 +15,7 @@ from utils.save_snapshots import move_json, save_in_disk
class UploadForm(forms.Form):
evidence_file = MultipleFileField(label=_("File"))
def clean_evidence_file(self):
def clean(self):
self.evidences = []
data = self.cleaned_data.get('evidence_file')
if not data:
@ -33,20 +33,13 @@ class UploadForm(forms.Form):
exist_annotation = Annotation.objects.filter(
uuid=file_json['uuid']
).first()
if exist_annotation:
raise ValidationError(
_("The snapshot already exists"),
code="duplicate_snapshot",
)
#Catch any error and display it as Validation Error so the Form handles it
except Exception as e:
raise ValidationError(
_("Error on '%(file_name)s': %(error)s"),
code="error",
params={"file_name": file_name, "error": getattr(e, 'message', str(e))},
)
raise ValidationError("error: {} exist".format(file_name))
except Exception:
raise ValidationError("error in: {}".format(file_name))
self.evidences.append((file_name, file_json))
return True
@ -130,15 +123,7 @@ class ImportForm(forms.Form):
data = self.cleaned_data["file_import"]
self.file_name = data.name
try:
df = pd.read_excel(data)
except Exception as e:
raise ValidationError(
_("Error on '%(file_name)s': Invalid File"),
params={"file_name": self.file_name}
)
df = pd.read_excel(data)
df.fillna('', inplace=True)
data_pd = df.to_dict(orient='index')

View File

@ -36,8 +36,10 @@ class Command(BaseCommand):
continue
user = institution.user_set.filter(is_admin=True).first()
if not user:
txt = "No there are Admins for the institution: %s"
logger.warning(txt, institution.name)
txt = "Error No there are Admins for the institution: {}".format(
institution.name
)
logger.exception(txt)
continue
snapshots_path = os.path.join(filepath, "snapshots")
@ -72,12 +74,13 @@ class Command(BaseCommand):
create_index(s, user)
create_annotation(s, user, commit=True)
except Exception as err:
txt = "In placeholder %s \n%s"
logger.warning(txt, f_path, err)
txt = "Error: in placeholder {} \n{}".format(f_path, err)
logger.exception(txt)
def build_snapshot(self, s, user, f_path):
try:
Build(s, user)
except Exception:
txt = "Error: in Snapshot {}".format(f_path)
logger.error(txt)
except Exception as err:
txt = "Error: in Snapshot {} \n{}".format(f_path, err)
logger.exception(txt)

View File

@ -4,9 +4,6 @@ import logging
from django.core.management.base import BaseCommand
from django.contrib.auth import get_user_model
from django.conf import settings
from utils.save_snapshots import move_json, save_in_disk
from evidence.parse import Build
@ -39,7 +36,7 @@ class Command(BaseCommand):
self.read_directory(path)
self.parsing()
def read_directory(self, directory):
for filename in os.listdir(directory):
filepath = os.path.join(directory, filename)
@ -47,22 +44,14 @@ class Command(BaseCommand):
self.open(filepath)
def open(self, filepath):
try:
with open(filepath, 'r') as file:
content = json.loads(file.read())
path_name = save_in_disk(content, self.user.institution.name)
self.snapshots.append((content, path_name))
except Exception as e:
logger.error("Could not open file %s: %s", filepath, e)
with open(filepath, 'r') as file:
content = json.loads(file.read())
self.snapshots.append(content)
def parsing(self):
for s, p in self.snapshots:
for s in self.snapshots:
try:
self.devices.append(Build(s, self.user))
move_json(p, self.user.institution.name)
except Exception as e:
snapshot_id = s.get("uuid", "")
txt = "Could not parse snapshot %s: %s"
logger.error(txt, snapshot_id, e)
except Exception as err:
logger.exception(err)

View File

@ -11,26 +11,50 @@ from user.models import User, Institution
class Annotation(models.Model):
class Type(models.IntegerChoices):
SYSTEM = 0, "System"
SYSTEM= 0, "System"
USER = 1, "User"
DOCUMENT = 2, "Document"
ERASE_SERVER = 3, "EraseServer"
ADMIN = 4, "Admin"
created = models.DateTimeField(auto_now_add=True)
uuid = models.UUIDField()
owner = models.ForeignKey(Institution, on_delete=models.CASCADE)
user = models.ForeignKey(
User, on_delete=models.SET_NULL, null=True, blank=True)
type = models.SmallIntegerField(choices=Type)
user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
type = models.SmallIntegerField(choices=Type)
key = models.CharField(max_length=STR_EXTEND_SIZE)
value = models.CharField(max_length=STR_EXTEND_SIZE)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["type", "key", "uuid"], name="unique_type_key_uuid")
models.UniqueConstraint(fields=["type", "key", "uuid"], name="unique_type_key_uuid")
]
#TODO: check if this works properly
def clean(self):
super().clean()
if self.type == self.Type.ADMIN:
if Annotation.objects.filter(type=self.Type.ADMIN, key=self.key).exists():
raise ValidationError(f"The key '{self.key}' is already reserved by admin.")
else:
if Annotation.objects.filter(type=self.Type.ADMIN, key=self.key).exists():
raise ValidationError(f"The key '{self.key}' is reserved by admin and cannot be used.")
class AllowedValue(models.Model):
annotation = models.ForeignKey(Annotation, on_delete=models.CASCADE, limit_choices_to={'type': Annotation.Type.ADMIN})
value = models.CharField(max_length=50)
def __str__(self):
return self.value
#This represents a change on a System-type Annotation
class Action(models.Model):
annotation = models.ForeignKey(Annotation, on_delete=models.CASCADE, limit_choices_to={'type': Annotation.Type.ADMIN})
created = models.DateTimeField(auto_now_add=True)
owner = models.ForeignKey(Institution, on_delete=models.CASCADE)
user = models.ForeignKey(User, on_delete=models.SET_NULL, null=True, blank=True)
comment = models.CharField(max_length=250)
class Evidence:
def __init__(self, uuid):
@ -39,8 +63,8 @@ class Evidence:
self.doc = None
self.created = None
self.dmi = None
self.annotations = []
self.components = []
self.annotations = []
self.components = []
self.default = "n/a"
self.get_owner()
@ -89,7 +113,7 @@ class Evidence:
return self.components
def get_manufacturer(self):
if self.is_web_snapshot():
if self.doc.get("type") == "WebSnapshot":
kv = self.doc.get('kv', {})
if len(kv) < 1:
return ""
@ -101,7 +125,7 @@ class Evidence:
return self.dmi.manufacturer().strip()
def get_model(self):
if self.is_web_snapshot():
if self.doc.get("type") == "WebSnapshot":
kv = self.doc.get('kv', {})
if len(kv) < 2:
return ""
@ -124,11 +148,6 @@ class Evidence:
return k
return ""
def get_serial_number(self):
if self.is_legacy():
return self.doc['device']['serialNumber']
return self.dmi.serial_number().strip()
@classmethod
def get_all(cls, user):
return Annotation.objects.filter(
@ -143,6 +162,3 @@ class Evidence:
def is_legacy(self):
return self.doc.get("software") != "workbench-script"
def is_web_snapshot(self):
return self.doc.get("type") == "WebSnapshot"

View File

@ -4,7 +4,6 @@ import logging
from dmidecode import DMIParse
from json_repair import repair_json
from evidence.parse_details import get_lshw_child
from evidence.models import Annotation
from evidence.xapian import index
@ -13,7 +12,16 @@ from utils.constants import CHASSIS_DH
logger = logging.getLogger('django')
def get_network_cards(child, nets):
if child['id'] == 'network' and "PCI:" in child.get("businfo"):
nets.append(child)
if child.get('children'):
[get_network_cards(x, nets) for x in child['children']]
def get_mac(lshw):
nets = []
try:
if type(lshw) is dict:
hw = lshw
@ -22,16 +30,18 @@ def get_mac(lshw):
except json.decoder.JSONDecodeError:
hw = json.loads(repair_json(lshw))
nets = []
get_lshw_child(hw, nets, 'network')
try:
get_network_cards(hw, nets)
except Exception as ss:
print("WARNING!! {}".format(ss))
return
nets_sorted = sorted(nets, key=lambda x: x['businfo'])
# This funcion get the network card integrated in motherboard
# integrate = [x for x in nets if "pci@0000:00:" in x.get('businfo', '')]
if nets_sorted:
mac = nets_sorted[0]['serial']
logger.debug("The snapshot has the following MAC: %s" , mac)
return mac
return nets_sorted[0]['serial']
class Build:
@ -80,8 +90,8 @@ class Build:
)
if annotation:
txt = "Warning: Snapshot %s already registered (annotation exists)"
logger.warning(txt, self.uuid)
txt = "Warning: Snapshot {} exist as annotation !!".format(self.uuid)
logger.exception(txt)
return
for k, v in self.algorithms.items():
@ -125,7 +135,9 @@ class Build:
# mac = get_mac2(hwinfo_raw) or ""
mac = get_mac(lshw) or ""
if not mac:
txt = "Could not retrieve MAC address in snapshot %s"
logger.warning(txt, snapshot['uuid'])
print(f"WARNING: Could not retrieve MAC address in snapshot {snapshot['uuid']}" )
# TODO generate system annotation for that snapshot
else:
print(f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}")
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}"

View File

@ -1,5 +1,4 @@
import json
import logging
import numpy as np
from datetime import datetime
@ -9,9 +8,6 @@ from json_repair import repair_json
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
logger = logging.getLogger('django')
def get_lshw_child(child, nets, component):
if child.get('id') == component:
nets.append(child)
@ -487,12 +483,12 @@ class ParseSnapshot:
if isinstance(x, str):
try:
try:
hw = json.loads(x)
hw = json.loads(lshw)
except json.decoder.JSONDecodeError:
hw = json.loads(repair_json(x))
hw = json.loads(repair_json(lshw))
return hw
except Exception as ss:
logger.warning("%s", ss)
print("WARNING!! {}".format(ss))
return {}
return x
@ -501,5 +497,5 @@ class ParseSnapshot:
return self._errors
logger.error(txt)
self._errors.append("%s", txt)
self._errors.append(txt)

View File

@ -29,44 +29,26 @@
<div class="tab-content pt-2">
<div class="tab-pane fade show active" id="device">
<h5 class="card-title"></h5>
<h5 class="card-title">List of chids</h5>
<div class="list-group col-6">
<table class="table">
<thead>
<tr>
<th scope="col" data-sortable="">
{% trans "Type" %}
</th>
<th scope="col" data-sortable="">
{% trans "Identificator" %}
</th>
<th scope="col" data-sortable="">
{% trans "Data" %}
</th>
</tr>
</thead>
{% for snap in object.annotations %}
<tbody>
{% if snap.type == 0 %}
<tr>
<td>
{{ snap.key }}
</td>
<td>
<small class="text-muted">
<a href="{% url 'device:details' snap.value %}">{{ snap.value }}</a>
</small>
</td>
<td>
<small class="text-muted">
{{ snap.created }}
</small>
</td>
</tr>
{% endif %}
</tbody>
{% endfor %}
</table>
{% for snap in object.annotations %}
{% if snap.type == 0 %}
<div class="list-group-item">
<div class="d-flex w-100 justify-content-between">
<h5 class="mb-1"></h5>
<small class="text-muted">
{{ snap.created }}
</small>
</div>
<p class="mb-1">
{{ snap.key }}<br />
</p>
<small class="text-muted">
<a href="{% url 'device:details' snap.value %}">{{ snap.value }}</a>
</small>
</div>
{% endif %}
{% endfor %}
</div>
</div>
<div class="tab-pane fade" id="tag">
@ -115,7 +97,7 @@
if (hash) {
// Buscar el botón o enlace que corresponde al hash y activarlo
const tabTrigger = document.querySelector(`[data-bs-target="${hash}"]`);
if (tabTrigger) {
// Crear una instancia de tab de Bootstrap para activar el tab
const tab = new bootstrap.Tab(tabTrigger);

View File

@ -8,21 +8,23 @@
</div>
</div>
<!-- override invalid-feedback class -->
<style>
.invalid-feedback {
color: #670000;
font-size: 1rem;
}
</style>
{% load django_bootstrap5 %}
<form role="form" method="post" enctype="multipart/form-data">
{% csrf_token %}
{% bootstrap_form form alert_error_type="none" error_css_class="alert alert-danger alert-icon alert-icon-border" %}
<div class="form-actions-no-box">
<a class="btn btn-grey" href="{% url 'dashboard:unassigned_devices' %}">{% translate "Cancel" %}</a>
{% if form.errors %}
<div class="alert alert-danger alert-icon alert-icon-border alert-dismissible" role="alert">
<div class="icon"><span class="mdi mdi-close-circle-o"></span></div>
<div class="message">
{% for field, error in form.errors.items %}
{{ error }}<br />
{% endfor %}
<button class="btn-close" type="button" data-dismiss="alert" aria-label="Close"></button>
</div>
</div>
{% endif %}
{% bootstrap_form form %}
<div class="form-actions-no-box">
<a class="btn btn-grey" href="{% url 'dashboard:unassigned_devices' %}">{% translate "Cancel" %}</a>
<input class="btn btn-green-admin" type="submit" name="submit" value="{% translate 'Save' %}" />
</div>

View File

@ -1,6 +1,5 @@
import json
from django.contrib import messages
from urllib.parse import urlparse
from django.http import HttpResponse
from django.utils.translation import gettext_lazy as _
@ -48,7 +47,6 @@ class UploadView(DashboardView, FormView):
def form_valid(self, form):
form.save(self.request.user)
messages.success(self.request, _("Evidence uploaded successfully."))
response = super().form_valid(form)
return response
@ -72,7 +70,6 @@ class ImportView(DashboardView, FormView):
def form_valid(self, form):
form.save()
messages.success(self.request, _("Evidence imported successfully."))
response = super().form_valid(form)
return response

View File

@ -110,6 +110,7 @@
<script src="/static/js/bootstrap.bundle.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/feather-icons@4.28.0/dist/feather.min.js" integrity="sha384-uO3SXW5IuS1ZpFPKugNNWqTZRRglnUJK6UAZ/gxOX80nxEkN9NcGZTftn6RzhGWE" crossorigin="anonymous"></script>
<script src="https://cdn.jsdelivr.net/npm/chart.js@2.9.4/dist/Chart.min.js" integrity="sha384-zNy6FEbO50N+Cg5wap8IKA4M/ZnLJgzc6w2NqACZaK0u0FXfOWRRJOnQtpZun8ha" crossorigin="anonymous"></script>
<script src="/static/js/dashboard.js"></script>
<script>
const togglePassword = document.querySelector('#togglePassword');
const password = document.querySelector('#id_password');

View File

@ -88,8 +88,8 @@ def create_annotation(doc, user, commit=False):
)
if annotation:
txt = "Warning: Snapshot %s already registered (annotation exists)"
logger.warning(txt, doc["uuid"])
txt = "Warning: Snapshot {} exist as annotation !!".format(doc["uuid"])
logger.exception(txt)
return annotation
return Annotation.objects.create(**data)

View File

@ -1,37 +0,0 @@
import logging
from django.conf import settings
# Colors
RED = "\033[91m"
PURPLE = "\033[95m"
YELLOW = "\033[93m"
RESET = "\033[0m"
class CustomFormatter(logging.Formatter):
def format(self, record):
if record.levelname == "ERROR":
color = RED
elif record.levelname == "WARNING":
color = YELLOW
elif record.levelname in ["INFO", "DEBUG"]:
color = PURPLE
else:
color = RESET
record.levelname = f"{color}{record.levelname}{RESET}"
if record.args:
record.msg = self.highlight_args(record.msg, record.args, color)
record.args = ()
# provide trace when DEBUG config
if settings.DEBUG:
import traceback
print(traceback.format_exc())
return super().format(record)
def highlight_args(self, message, args, color):
highlighted_args = tuple(f"{color}{arg}{RESET}" for arg in args)
return message % highlighted_args