Compare commits

...

82 commits

Author SHA1 Message Date
pedro 78de86c43a Merge pull request 'inxi (take 2)' (#38) from inxi into main
Reviewed-on: #38
2024-12-16 18:48:41 +00:00
pedro 498da24ee6 docker-reset: bugfix to reset, add flag file 2024-12-16 19:37:59 +01:00
pedro 67213e46d2 bugfix typo on entrypoint 2024-12-16 19:29:48 +01:00
pedro 5ec44d3378 better .env.example (just edit DOMAIN) 2024-12-16 19:28:08 +01:00
pedro 1d195806aa bad bugfix: docker entrypoint overrides .env
because of dpp
2024-12-16 19:25:32 +01:00
pedro d8202fea0f dockerfile: remove unnecessary pip upgrade 2024-12-16 19:19:19 +01:00
pedro e6fea8c1c3 update requirements so works on lxc-docker 2024-12-16 19:19:16 +01:00
Cayo Puigdefabregas 782f6dac51 refactor inxi 2024-12-16 18:12:02 +01:00
Cayo Puigdefabregas 46bbb940d7 fix parsing with credentials 2024-12-16 18:04:07 +01:00
Cayo Puigdefabregas abdefe885e fix parsing 2024-12-16 18:04:07 +01:00
Cayo Puigdefabregas 1902647337 fix get_hid 2024-12-16 18:04:07 +01:00
Cayo Puigdefabregas 989120bd0a fix component empty 2024-12-16 18:04:07 +01:00
Cayo Puigdefabregas dbdcffe5d9 add inxi in parsing and show in details of devs 2024-12-16 18:04:03 +01:00
pedro 1ea400051c Merge pull request 'DPP/DLT functionality' (#36) from dpp into main
Reviewed-on: #36
2024-12-11 15:15:14 +00:00
pedro f5ea0f8d8d docker entrypoint: bugfix when DPP env var unbound 2024-12-11 11:25:35 +01:00
pedro c5dd3a759d docker entrypoint: adapt it to DPP env var 2024-12-11 11:23:35 +01:00
pedro 3bf23c5d3b propagate DPP env var to docker 2024-12-11 11:16:32 +01:00
Cayo Puigdefabregas e54d8ba49c activate/deactivate DPP from env 2024-12-05 10:54:41 +01:00
Cayo Puigdefabregas 7f44d88b61 convert jsonld in credentials for dpps 2024-12-04 18:25:16 +01:00
Cayo Puigdefabregas b2e6406fb6 drop loggers 2024-11-29 17:23:44 +01:00
Cayo Puigdefabregas b477431fcd fix register dpp 2024-11-29 16:00:10 +01:00
Cayo Puigdefabregas 2c21977c7c fix did dpp 2024-11-29 15:48:48 +01:00
Cayo Puigdefabregas 563e394afc debug timestamp 2024-11-28 21:24:59 +01:00
pedro f566bfdf03 bugfix attempt verifyProof
co-authored with cayo
2024-11-28 21:14:32 +01:00
Cayo Puigdefabregas f04d425a31 more and more debug 2024-11-28 20:59:43 +01:00
Cayo Puigdefabregas 8aefc90ece more debug 2024-11-28 20:21:21 +01:00
Cayo Puigdefabregas 13249d564f fix call to proofs 2024-11-28 18:48:50 +01:00
Cayo Puigdefabregas ddb764b3ea dpp for proofs 2024-11-28 17:25:40 +01:00
Cayo Puigdefabregas 351c62b45d dpp for proofs 2024-11-28 17:22:42 +01:00
Cayo Puigdefabregas 16a3a870be drop actions for dpp 2024-11-28 17:13:21 +01:00
Cayo Puigdefabregas 76c4b10fc4 drop actions for dpp 2024-11-28 17:01:44 +01:00
Cayo Puigdefabregas 1910609f68 debug in proof call 2024-11-28 16:54:31 +01:00
Cayo Puigdefabregas 53524643c8 fix cors origin 2024-11-28 16:37:38 +01:00
Cayo Puigdefabregas 9b7bbd6bcf fix json call to chid 2024-11-28 16:22:06 +01:00
Cayo Puigdefabregas f7f6da5892 fix phid hash list 2024-11-28 15:47:03 +01:00
Cayo Puigdefabregas 8643495a9d fix phid hash list 2024-11-28 15:40:09 +01:00
Cayo Puigdefabregas dd327e5231 fix phid 2024-11-28 15:33:57 +01:00
Cayo Puigdefabregas cea5a279b7 fix 2024-11-28 15:25:57 +01:00
Cayo Puigdefabregas 594904905b new document and out device and components 2024-11-28 12:21:34 +01:00
pedro 74f0d5a507 comment logger trace when DEBUG
is it necessary?
2024-11-28 00:00:09 +01:00
Cayo Puigdefabregas 6d53a2acda remove flask sintax for django sintax 2024-11-27 18:41:17 +01:00
Cayo Puigdefabregas e64e9b3c06 remove flask sintax for django sintax 2024-11-27 18:36:08 +01:00
Cayo Puigdefabregas f1c21af654 add_services 2024-11-27 18:29:36 +01:00
Cayo Puigdefabregas 6efcf5ac18 add result for dpp and for chid 2024-11-27 18:11:22 +01:00
Cayo Puigdefabregas 5631da453a fix new document json 2024-11-27 16:53:31 +01:00
Cayo Puigdefabregas 0c5adf87c6 fix get_result 2024-11-27 15:59:23 +01:00
pedro dde4a114a0 dhub settings: bugfix wrong DLT TOKEN 2024-11-27 15:50:33 +01:00
Cayo Puigdefabregas e791d0c63c fix get_result for get correct document 2024-11-27 15:50:07 +01:00
pedro 833c458840 progress on making it work
still fails
2024-11-27 15:06:41 +01:00
Cayo Puigdefabregas e6424af251 remove pdb 2024-11-27 13:08:43 +01:00
Cayo Puigdefabregas d31c5c7921 get_result for json 2024-11-27 13:07:13 +01:00
Cayo Puigdefabregas 5432d22f35 view dpp page 2024-11-27 13:07:13 +01:00
Cayo Puigdefabregas fdc431b43a fix 2024-11-27 13:07:13 +01:00
pedro ba6c955463 no sudo in docker-reset, all is with user 1000 2024-11-27 09:31:13 +01:00
pedro 8eb89e602d dh-django dockerfile: use uid 1000 (app)
at least temporarily
2024-11-27 09:28:39 +01:00
pedro 80ab639759 dh docker: bugfix wrong usage of up_snapshots 2024-11-27 01:38:24 +01:00
pedro 162fc9a8ef dh dockerfile: add time debpkg 2024-11-27 01:31:15 +01:00
pedro eec14f4ffb dh docker: bugfix wrong path in rm prev snapshots 2024-11-27 01:28:19 +01:00
pedro 3ab21b8c32 dh docker: create institution before first dlt usr 2024-11-27 01:25:15 +01:00
pedro 316184df47 bugfix logger 2024-11-27 01:21:52 +01:00
pedro 7d6119c11a logger: bugfix function name changed for highlight 2024-11-27 01:12:58 +01:00
pedro 1248ab6894 dh docker: cleanup other snapshots when dpp/dlt 2024-11-27 01:10:34 +01:00
pedro 41e4374612 add dpp 2024-11-27 01:09:06 +01:00
pedro 6e7cb0bf84 dh docker: first migrate, then config 2024-11-27 01:08:22 +01:00
pedro 09f6cd2e68 utils/logger: ensure msgs are logged 2024-11-27 00:52:20 +01:00
pedro e25b8e5994 logger: improve error handling 2024-11-27 00:38:03 +01:00
pedro 2793344684 dpp/dlt: fix typo 2024-11-27 00:21:41 +01:00
pedro f70c75a92f dpp/dlt: fix typo 2024-11-27 00:17:41 +01:00
pedro 83719edc8e dpp/dlt: fix typo 2024-11-27 00:15:16 +01:00
pedro 69bc95c0cd dh docker entrypoint: use appropriate new env vars 2024-11-27 00:11:44 +01:00
pedro 9863a59911 docker: remove unused vars in django
were used in the flask app devicehub-teal
2024-11-26 19:25:30 +01:00
pedro eb78d38c3a docker entrypoint: make DB_* optional 2024-11-26 12:47:15 +01:00
pedro a49b31dd85 docker devicehub-django entrypoint 2024-11-22 12:07:25 +01:00
Cayo Puigdefabregas 493c7636b2 fix 2024-11-20 12:04:56 +01:00
Cayo Puigdefabregas 88e036eb3c fix 2024-11-20 12:03:20 +01:00
Cayo Puigdefabregas b759c53e75 add memberFederated model 2024-11-20 10:52:22 +01:00
Cayo Puigdefabregas 7ab88ad290 add did view 2024-11-19 21:18:45 +01:00
Cayo Puigdefabregas f6d1cf719c add commands for setup to dlt 2024-11-19 21:18:45 +01:00
pedro fb836edfbb docker: add dpp python dep ereuseapitest 2024-11-19 18:11:48 +01:00
Cayo Puigdefabregas cc350775ed . 2024-11-19 18:02:43 +01:00
Cayo Puigdefabregas 0d574cae63 register device and dpp in dlt and dpp api 2024-11-18 19:37:08 +01:00
Cayo Puigdefabregas 9857891b63 first base for dpp 2024-11-15 18:56:11 +01:00
54 changed files with 2140 additions and 494 deletions

View file

@ -2,7 +2,8 @@ DOMAIN=localhost
DEMO=true DEMO=true
# note that with DEBUG=true, logs are more verbose (include tracebacks) # note that with DEBUG=true, logs are more verbose (include tracebacks)
DEBUG=true DEBUG=true
ALLOWED_HOSTS=localhost,localhost:8000,127.0.0.1, ALLOWED_HOSTS=${DOMAIN},${DOMAIN}:8000,127.0.0.1,127.0.0.1:8000
DPP=false
STATIC_ROOT=/tmp/static/ STATIC_ROOT=/tmp/static/
MEDIA_ROOT=/tmp/media/ MEDIA_ROOT=/tmp/media/

View file

@ -85,17 +85,21 @@ class NewSnapshotView(ApiMixing):
# except Exception: # except Exception:
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400) # return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
if not data.get("uuid"): ev_uuid = data.get("uuid")
if data.get("credentialSubject"):
ev_uuid = data["credentialSubject"].get("uuid")
if not ev_uuid:
txt = "error: the snapshot not have uuid" txt = "error: the snapshot not have uuid"
logger.error("%s", txt) logger.error("%s", txt)
return JsonResponse({'status': txt}, status=500) return JsonResponse({'status': txt}, status=500)
exist_annotation = Annotation.objects.filter( exist_annotation = Annotation.objects.filter(
uuid=data['uuid'] uuid=ev_uuid
).first() ).first()
if exist_annotation: if exist_annotation:
txt = "error: the snapshot {} exist".format(data['uuid']) txt = "error: the snapshot {} exist".format(ev_uuid)
logger.warning("%s", txt) logger.warning("%s", txt)
return JsonResponse({'status': txt}, status=500) return JsonResponse({'status': txt}, status=500)
@ -105,14 +109,14 @@ class NewSnapshotView(ApiMixing):
except Exception as err: except Exception as err:
if settings.DEBUG: if settings.DEBUG:
logger.exception("%s", err) logger.exception("%s", err)
snapshot_id = data.get("uuid", "") snapshot_id = ev_uuid
txt = "It is not possible to parse snapshot: %s." txt = "It is not possible to parse snapshot: %s."
logger.error(txt, snapshot_id) logger.error(txt, snapshot_id)
text = "fail: It is not possible to parse snapshot" text = "fail: It is not possible to parse snapshot"
return JsonResponse({'status': text}, status=500) return JsonResponse({'status': text}, status=500)
annotation = Annotation.objects.filter( annotation = Annotation.objects.filter(
uuid=data['uuid'], uuid=ev_uuid,
type=Annotation.Type.SYSTEM, type=Annotation.Type.SYSTEM,
# TODO this is hardcoded, it should select the user preferred algorithm # TODO this is hardcoded, it should select the user preferred algorithm
key="hidalgo1", key="hidalgo1",
@ -121,7 +125,7 @@ class NewSnapshotView(ApiMixing):
if not annotation: if not annotation:
logger.error("Error: No annotation for uuid: %s", data["uuid"]) logger.error("Error: No annotation for uuid: %s", ev_uuid)
return JsonResponse({'status': 'fail'}, status=500) return JsonResponse({'status': 'fail'}, status=500)
url_args = reverse_lazy("device:details", args=(annotation.value,)) url_args = reverse_lazy("device:details", args=(annotation.value,))

View file

@ -69,7 +69,11 @@
{{ dev.manufacturer }} {{ dev.manufacturer }}
</td> </td>
<td> <td>
{% if dev.version %}
{{dev.version}} {{ dev.model }}
{% else %}
{{ dev.model }} {{ dev.model }}
{% endif %}
</td> </td>
</tr> </tr>
</tbody> </tbody>

View file

@ -84,8 +84,11 @@ class SearchView(InventaryMixin):
return devices, count return devices, count
def get_annotations(self, xp): def get_annotations(self, xp):
snap = xp.document.get_data() snap = json.loads(xp.document.get_data())
uuid = json.loads(snap).get('uuid') if snap.get("credentialSubject"):
uuid = snap["credentialSubject"]["uuid"]
else:
uuid = snap["uuid"]
return Device.get_annotation_from_uuid(uuid, self.request.user.institution) return Device.get_annotation_from_uuid(uuid, self.request.user.institution)
def search_hids(self, query, offset, limit): def search_hids(self, query, offset, limit):

View file

@ -25,6 +25,7 @@ class Device:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
# the id is the chid of the device # the id is the chid of the device
self.id = kwargs["id"] self.id = kwargs["id"]
self.uuid = kwargs.get("uuid")
self.pk = self.id self.pk = self.id
self.shortid = self.pk[:6].upper() self.shortid = self.pk[:6].upper()
self.algorithm = None self.algorithm = None
@ -103,11 +104,19 @@ class Device:
self.evidences = [Evidence(u) for u in self.uuids] self.evidences = [Evidence(u) for u in self.uuids]
def get_last_evidence(self): def get_last_evidence(self):
if self.last_evidence:
return
if self.uuid:
self.last_evidence = Evidence(self.uuid)
return
annotations = self.get_annotations() annotations = self.get_annotations()
if not annotations.count(): if not annotations.count():
return return
annotation = annotations.first() annotation = annotations.first()
self.last_evidence = Evidence(annotation.uuid) self.last_evidence = Evidence(annotation.uuid)
self.uuid = annotation.uuid
def is_eraseserver(self): def is_eraseserver(self):
if not self.uuids: if not self.uuids:
@ -126,6 +135,8 @@ class Device:
return False return False
def last_uuid(self): def last_uuid(self):
if self.uuid:
return self.uuid
return self.uuids[0] return self.uuids[0]
def get_lots(self): def get_lots(self):
@ -263,45 +274,44 @@ class Device:
@property @property
def is_websnapshot(self): def is_websnapshot(self):
if not self.last_evidence: self.get_last_evidence()
self.get_last_evidence()
return self.last_evidence.doc['type'] == "WebSnapshot" return self.last_evidence.doc['type'] == "WebSnapshot"
@property @property
def last_user_evidence(self): def last_user_evidence(self):
if not self.last_evidence: self.get_last_evidence()
self.get_last_evidence()
return self.last_evidence.doc['kv'].items() return self.last_evidence.doc['kv'].items()
@property @property
def manufacturer(self): def manufacturer(self):
if not self.last_evidence: self.get_last_evidence()
self.get_last_evidence()
return self.last_evidence.get_manufacturer() return self.last_evidence.get_manufacturer()
@property @property
def serial_number(self): def serial_number(self):
if not self.last_evidence: self.get_last_evidence()
self.get_last_evidence()
return self.last_evidence.get_serial_number() return self.last_evidence.get_serial_number()
@property @property
def type(self): def type(self):
self.get_last_evidence()
if self.last_evidence.doc['type'] == "WebSnapshot": if self.last_evidence.doc['type'] == "WebSnapshot":
return self.last_evidence.doc.get("device", {}).get("type", "") return self.last_evidence.doc.get("device", {}).get("type", "")
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.get_chassis() return self.last_evidence.get_chassis()
@property @property
def model(self): def model(self):
if not self.last_evidence: self.get_last_evidence()
self.get_last_evidence()
return self.last_evidence.get_model() return self.last_evidence.get_model()
@property @property
def components(self): def version(self):
if not self.last_evidence: if not self.last_evidence:
self.get_last_evidence() self.get_last_evidence()
return self.last_evidence.get_version()
@property
def components(self):
self.get_last_evidence()
return self.last_evidence.get_components() return self.last_evidence.get_components()

View file

@ -29,6 +29,11 @@
<li class="nav-item"> <li class="nav-item">
<a href="#evidences" class="nav-link" data-bs-toggle="tab" data-bs-target="#evidences">{% trans 'Evidences' %}</a> <a href="#evidences" class="nav-link" data-bs-toggle="tab" data-bs-target="#evidences">{% trans 'Evidences' %}</a>
</li> </li>
{% if dpps %}
<li class="nav-item">
<a href="#dpps" class="nav-link" data-bs-toggle="tab" data-bs-target="#dpps">{% trans 'Dpps' %}</a>
</li>
{% endif %}
<li class="nav-item"> <li class="nav-item">
<a class="nav-link" href="{% url 'device:device_web' object.id %}" target="_blank">Web</a> <a class="nav-link" href="{% url 'device:device_web' object.id %}" target="_blank">Web</a>
</li> </li>
@ -53,34 +58,41 @@
</div> </div>
{% endif %} {% endif %}
<div class="row mb-3"> <div class="row mb-1">
<div class="col-lg-3 col-md-4 label">Type</div> <div class="col-lg-3 col-md-4 label">Type</div>
<div class="col-lg-9 col-md-8">{{ object.type }}</div> <div class="col-lg-9 col-md-8">{{ object.type }}</div>
</div> </div>
{% if object.is_websnapshot and object.last_user_evidence %} {% if object.is_websnapshot and object.last_user_evidence %}
{% for k, v in object.last_user_evidence %} {% for k, v in object.last_user_evidence %}
<div class="row mb-3"> <div class="row mb-1">
<div class="col-lg-3 col-md-4 label">{{ k }}</div> <div class="col-lg-3 col-md-4 label">{{ k }}</div>
<div class="col-lg-9 col-md-8">{{ v|default:'' }}</div> <div class="col-lg-9 col-md-8">{{ v|default:'' }}</div>
</div> </div>
{% endfor %} {% endfor %}
{% else %} {% else %}
<div class="row mb-3"> <div class="row mb-1">
<div class="col-lg-3 col-md-4 label"> <div class="col-lg-3 col-md-4 label">
{% trans 'Manufacturer' %} {% trans 'Manufacturer' %}
</div> </div>
<div class="col-lg-9 col-md-8">{{ object.manufacturer|default:'' }}</div> <div class="col-lg-9 col-md-8">{{ object.manufacturer|default:'' }}</div>
</div> </div>
<div class="row mb-3"> <div class="row mb-1">
<div class="col-lg-3 col-md-4 label"> <div class="col-lg-3 col-md-4 label">
{% trans 'Model' %} {% trans 'Model' %}
</div> </div>
<div class="col-lg-9 col-md-8">{{ object.model|default:'' }}</div> <div class="col-lg-9 col-md-8">{{ object.model|default:'' }}</div>
</div> </div>
<div class="row mb-3"> <div class="row mb-1">
<div class="col-lg-3 col-md-4 label">
{% trans 'Version' %}
</div>
<div class="col-lg-9 col-md-8">{{ object.version|default:'' }}</div>
</div>
<div class="row mb-1">
<div class="col-lg-3 col-md-4 label"> <div class="col-lg-3 col-md-4 label">
{% trans 'Serial Number' %} {% trans 'Serial Number' %}
</div> </div>
@ -229,6 +241,25 @@
{% endfor %} {% endfor %}
</div> </div>
</div> </div>
{% if dpps %}
<div class="tab-pane fade" id="dpps">
<h5 class="card-title">{% trans 'List of dpps' %}</h5>
<div class="list-group col">
{% for d in dpps %}
<div class="list-group-item">
<div class="d-flex w-100 justify-content-between">
<small class="text-muted">{{ d.timestamp }}</small>
<span>{{ d.type }}</span>
</div>
<p class="mb-1">
<a href="{% url 'did:device_web' d.signature %}">{{ d.signature }}</a>
</p>
</div>
{% endfor %}
</div>
</div>
{% endif %}
</div> </div>
{% endblock %} {% endblock %}
@ -237,12 +268,12 @@
document.addEventListener('DOMContentLoaded', function () { document.addEventListener('DOMContentLoaded', function () {
// Obtener el hash de la URL (ejemplo: #components) // Obtener el hash de la URL (ejemplo: #components)
const hash = window.location.hash const hash = window.location.hash
// Verificar si hay un hash en la URL // Verificar si hay un hash en la URL
if (hash) { if (hash) {
// Buscar el botón o enlace que corresponde al hash y activarlo // Buscar el botón o enlace que corresponde al hash y activarlo
const tabTrigger = document.querySelector(`[data-bs-target="${hash}"]`) const tabTrigger = document.querySelector(`[data-bs-target="${hash}"]`)
if (tabTrigger) { if (tabTrigger) {
// Crear una instancia de tab de Bootstrap para activar el tab // Crear una instancia de tab de Bootstrap para activar el tab
const tab = new bootstrap.Tab(tabTrigger) const tab = new bootstrap.Tab(tabTrigger)

View file

@ -1,7 +1,5 @@
import json
from django.http import JsonResponse from django.http import JsonResponse
from django.conf import settings
from django.http import Http404
from django.urls import reverse_lazy from django.urls import reverse_lazy
from django.shortcuts import get_object_or_404, Http404 from django.shortcuts import get_object_or_404, Http404
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
@ -16,6 +14,9 @@ from evidence.models import Annotation
from lot.models import LotTag from lot.models import LotTag
from device.models import Device from device.models import Device
from device.forms import DeviceFormSet from device.forms import DeviceFormSet
if settings.DPP:
from dpp.models import Proof
from dpp.api_dlt import PROOF_TYPE
class NewDeviceView(DashboardView, FormView): class NewDeviceView(DashboardView, FormView):
@ -103,10 +104,17 @@ class DetailsView(DashboardView, TemplateView):
context = super().get_context_data(**kwargs) context = super().get_context_data(**kwargs)
self.object.initial() self.object.initial()
lot_tags = LotTag.objects.filter(owner=self.request.user.institution) lot_tags = LotTag.objects.filter(owner=self.request.user.institution)
dpps = []
if settings.DPP:
dpps = Proof.objects.filter(
uuid__in=self.object.uuids,
type=PROOF_TYPE["IssueDPP"]
)
context.update({ context.update({
'object': self.object, 'object': self.object,
'snapshot': self.object.get_last_evidence(), 'snapshot': self.object.get_last_evidence(),
'lot_tags': lot_tags, 'lot_tags': lot_tags,
'dpps': dpps,
}) })
return context return context

View file

@ -91,6 +91,11 @@ INSTALLED_APPS = [
"api", "api",
] ]
DPP = config("DPP", default=False, cast=bool)
if DPP:
INSTALLED_APPS.extend(["dpp", "did"])
MIDDLEWARE = [ MIDDLEWARE = [
"django.middleware.security.SecurityMiddleware", "django.middleware.security.SecurityMiddleware",
@ -239,3 +244,9 @@ LOGGING = {
SNAPSHOT_PATH="/tmp/" SNAPSHOT_PATH="/tmp/"
DATA_UPLOAD_MAX_NUMBER_FILES = 1000 DATA_UPLOAD_MAX_NUMBER_FILES = 1000
COMMIT = config('COMMIT', default='') COMMIT = config('COMMIT', default='')
# DLT SETTINGS
TOKEN_DLT = config("API_DLT_TOKEN", default=None)
API_DLT = config("API_DLT", default=None)
API_RESOLVER = config("API_RESOLVER", default=None)
ID_FEDERATED = config("ID_FEDERATED", default=None)

View file

@ -14,7 +14,7 @@ Including another URLconf
1. Import the include() function: from django.urls import include, path 1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls')) 2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
""" """
from django.conf import settings
from django.urls import path, include from django.urls import path, include
urlpatterns = [ urlpatterns = [
@ -28,3 +28,9 @@ urlpatterns = [
path("lot/", include("lot.urls")), path("lot/", include("lot.urls")),
path('api/', include('api.urls')), path('api/', include('api.urls')),
] ]
if settings.DPP:
urlpatterns.extend([
path('dpp/', include('dpp.urls')),
path('did/', include('did.urls')),
])

0
did/__init__.py Normal file
View file

3
did/admin.py Normal file
View file

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

6
did/apps.py Normal file
View file

@ -0,0 +1,6 @@
from django.apps import AppConfig
class DidConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "did"

View file

3
did/models.py Normal file
View file

@ -0,0 +1,3 @@
from django.db import models
# Create your models here.

View file

@ -0,0 +1,41 @@
dpp_tmpl = {
"@context": [
"https://www.w3.org/ns/credentials/v2",
"https://test.uncefact.org/vocabulary/untp/dpp/0.5.0/"
],
"type": [
"DigitalProductPassport",
"VerifiableCredential"
],
"id": "https://example.ereuse.org/credentials/2a423366-a0d6-4855-ba65-2e0c926d09b0",
"issuer": {
"type": [
"CredentialIssuer"
],
"id": "did:web:r1.identifiers.ereuse.org:did-registry:z6Mkoreij5y9bD9fL5SGW6TfMUmcbaV7LCPwZHCFEEZBrVYQ#z6Mkoreij5y9bD9fL5SGW6TfMUmcbaV7LCPwZHCFEEZBrVYQ",
"name": "Refurbisher One"
},
"validFrom": "2024-11-15T12:00:00",
"validUntil": "2034-11-15T12:00:00",
"credentialSubject": {
"type": [
"Product"
],
"id": "https://id.ereuse.org/01/09520123456788/21/12345",
"name": "Refurbished XYZ Lenovo laptop item",
"registeredId": "09520123456788.21.12345",
"description": "XYZ Lenovo laptop refurbished by Refurbisher One",
"data": ""
},
"credentialSchema": {
"id": "https://idhub.pangea.org/vc_schemas/dpp.json",
"type": "FullJsonSchemaValidator2021",
"proof": {
"type": "Ed25519Signature2018",
"proofPurpose": "assertionMethod",
"verificationMethod": "did:web:r1.identifiers.ereuse.org:did-registry:z6Mkoreij5y9bD9fL5SGW6TfMUmcbaV7LCPwZHCFEEZBrVYQ#z6Mkoreij5y9bD9fL5SGW6TfMUmcbaV7LCPwZHCFEEZBrVYQ",
"created": "2024-12-03T15:33:42Z",
"jws": "eyJhbGciOiJFZERTQSIsImNyaXQiOlsiYjY0Il0sImI2NCI6ZmFsc2V9..rBPqbOcZCXB7GAnq1XIfV9Jvw4MKXlHff7qZkRfgwQ0Hnd9Ujt5s1xT4O0K6VESzWvdP2mOvMvu780fVNfraBQ"
}
}
}

View file

@ -0,0 +1,497 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>{{ object.type }}</title>
<link href="https://cdnjs.cloudflare.com/ajax/libs/bootstrap/5.1.3/css/bootstrap.min.css" rel="stylesheet" />
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.11.1/font/bootstrap-icons.css" />
<style>
body {
font-size: 0.875rem;
background-color: #f8f9fa;
display: flex;
flex-direction: column;
min-height: 100vh;
}
.custom-container {
background-color: #ffffff;
border-radius: 10px;
box-shadow: 0 0 20px rgba(0, 0, 0, 0.1);
padding: 30px;
margin-top: 30px;
flex-grow: 1;
}
.section-title {
color: #7a9f4f;
border-bottom: 2px solid #9cc666;
padding-bottom: 10px;
margin-bottom: 20px;
font-size: 1.5em;
}
.info-row {
margin-bottom: 10px;
}
.info-label {
font-weight: bold;
color: #545f71;
}
.info-value {
color: #333;
}
.component-card {
background-color: #f8f9fa;
border-left: 4px solid #9cc666;
margin-bottom: 15px;
transition: all 0.3s ease;
}
.component-card:hover {
box-shadow: 0 5px 15px rgba(0, 0, 0, 0.1);
transform: translateY(-2px);
}
.hash-value {
word-break: break-all;
background-color: #f3f3f3;
padding: 5px;
border-radius: 4px;
font-family: monospace;
font-size: 0.9em;
border: 1px solid #e0e0e0;
}
.card-title {
color: #9cc666;
}
.btn-primary {
background-color: #9cc666;
border-color: #9cc666;
padding: 0.1em 2em;
font-weight: 700;
}
.btn-primary:hover {
background-color: #8ab555;
border-color: #8ab555;
}
.btn-green-user {
background-color: #c7e3a3;
}
.btn-grey {
background-color: #f3f3f3;
}
footer {
background-color: #545f71;
color: #ffffff;
text-align: center;
padding: 10px 0;
margin-top: 20px;
}
</style>
</head>
<body>
<div class="container custom-container">
<nav class="header-nav ms-auto">
<div class="d-flex align-items-right">
<span class="nav-item">
{% if not roles and user.is_anonymous %}
<button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#validateModal">Validate</button>
{% else %}
<button class="btn btn-primary" id="buttonRole" data-bs-toggle="modal" data-bs-target="#rolesModal">Select your role</button>
<a class="btn btn-primary" href="{% url 'login:logout' %}?next={{ path }}">Logout</a>
{% endif %}
</span>
</div>
{% if role %}
<div class="d-flex justify-content-end">
<span class="nav-item">
Current Role: {{ role }}
</span>
</div>
{% endif %}
</nav>
<h1 class="text-center mb-4" style="color: #545f71;">{{ object.manufacturer }} {{ object.type }} {{ object.model }}</h1>
<div class="row">
<div class="col-lg-6">
{% if manuals.details.logo %}
<img style="max-width: 50px; margin-right: 15px;" src="{{ manuals.details.logo }}" />
{% endif %}
</div>
<div class="col-lg-6">
{% if manuals.details.image %}
<img style="width: 100px;" src="{{ manuals.details.image }}" />
{% endif %}
</div>
</div>
<div class="row">
<div class="col-lg-6">
<h2 class="section-title">Details</h2>
<div class="info-row row">
<div class="col-md-4 info-label">Phid</div>
<div class="col-md-8 info-value">
<div class="hash-value">{{ object.id }}</div>
</div>
</div>
<div class="info-row row">
<div class="col-md-4 info-label">Type</div>
<div class="col-md-8 info-value">{{ object.type }}</div>
</div>
{% if object.is_websnapshot %}
{% for snapshot_key, snapshot_value in object.last_user_evidence %}
<div class="info-row row">
<div class="col-md-4 info-label">{{ snapshot_key }}</div>
<div class="col-md-8 info-value">{{ snapshot_value|default:'' }}</div>
</div>
{% endfor %}
{% else %}
<div class="info-row row">
<div class="col-md-4 info-label">Manufacturer</div>
<div class="col-md-8 info-value">{{ object.manufacturer|default:'' }}</div>
</div>
<div class="info-row row">
<div class="col-md-4 info-label">Model</div>
<div class="col-md-8 info-value">{{ object.model|default:'' }}</div>
</div>
{% if user.is_authenticated %}
<div class="info-row row">
<div class="col-md-4 info-label">Serial Number</div>
<div class="col-md-8 info-value">{{ object.serial_number|default:'' }}</div>
</div>
{% endif %}
{% endif %}
</div>
<div class="col-lg-6">
<h2 class="section-title">Identifiers</h2>
{% for chid in object.hids %}
<div class="info-row">
<div class="hash-value">{{ chid|default:'' }}</div>
</div>
{% endfor %}
</div>
</div>
<h2 class="section-title mt-5">Components</h2>
<div class="row">
{% for component in object.components %}
<div class="col-md-6 mb-3">
<div class="card component-card">
<div class="card-body">
<h5 class="card-title">{{ component.type }}</h5>
<p class="card-text">
{% for component_key, component_value in component.items %}
{% if component_key not in 'actions,type' %}
{% if component_key != 'serialNumber' or user.is_authenticated %}
<strong>{{ component_key }}:</strong> {{ component_value }}<br />
{% endif %}
{% endif %}
{% endfor %}
</p>
</div>
</div>
</div>
{% endfor %}
</div>
{% if manuals.icecat %}
<h5 class="card-title">Icecat data sheet</h5>
<div class="row">
<div class="col-12 list-group-item d-flex align-items-center">
{% if manuals.details.logo %}
<img style="max-width: 50px; margin-right: 15px;" src="{{ manuals.details.logo }}" />
{% endif %}
{% if manuals.details.image %}
<img style="max-width: 100px; margin-right: 15px;" src="{{ manuals.details.image }}" />
{% endif %}
{% if manuals.details.pdf %}
<a href="{{ manuals.details.pdf }}" target="_blank">{{ manuals.details.title }}</a><br />
{% else %}
{{ manuals.details.title }}<br />
{% endif %}
</div>
<div class="col-12 accordion-item">
<h5 class="card-title accordion-header">
<button class="accordion-button collapsed" data-bs-target="#manuals-icecat" type="button"
data-bs-toggle="collapse" aria-expanded="false">
More examples
</button>
</h5>
<div id="manuals-icecat" class="row accordion-collapse collapse">
<div class="accordion-body">
{% for m in manuals.icecat %}
<div class="list-group-item d-flex align-items-center">
{% if m.logo %}
<img style="max-width: 50px; margin-right: 15px;" src="{{ m.logo }}" />
{% endif %}
{% if m.pdf %}
<a href="{{ m.pdf }}" target="_blank">{{ m.title }}</a><br />
{% else %}
{{ m.title }}<br />
{% endif %}
</div>
{% endfor %}
</div>
</div>
</div>
</div>
{% endif %}
{% if manuals.laer %}
<div class="row mt-3">
<div class="col-12">
<h5 class="card-title">Recycled Content</h5>
<div class="row mb-3">
<div class="col-sm-2">
Metal
</div>
<div class="col-sm-10">
<div class="progress">
<div class="progress-bar"
role="progressbar"
style="width: {{ manuals.laer.0.metal }}%"
aria-valuenow="{{ manuals.laer.0.metal }}"
aria-valuemin="0"
aria-valuemax="100">{{ manuals.laer.0.metal }}%
</div>
</div>
</div>
</div>
<div class="row mb-3">
<div class="col-sm-2">
Plastic post Consumer
</div>
<div class="col-sm-10">
<div class="progress">
<div class="progress-bar"
role="progressbar"
style="width: {{ manuals.laer.0.plastic_post_consumer }}%"
aria-valuenow="{{ manuals.laer.0.plastic_post_consumer }}"
aria-valuemin="0"
aria-valuemax="100">{{ manuals.laer.0.plastic_post_consumer }}%
</div>
</div>
</div>
</div>
<div class="row mb-3">
<div class="col-sm-2">
Plastic post Industry
</div>
<div class="col-sm-10">
<div class="progress">
<div class="progress-bar"
role="progressbar"
style="width: {{ manuals.laer.0.plastic_post_industry }}%"
aria-valuenow="{{ manuals.laer.0.plastic_post_industry }}"
aria-valuemin="0"
aria-valuemax="100">{{ manuals.laer.0.plastic_post_industry }}%
</div>
</div>
</div>
</div>
</div>
</div>
{% endif %}
{% if manuals.energystar %}
<div class="row mt-3">
<div class="col-12">
<h5 class="card-title">Energy spent</h5>
{% if manuals.energystar.long_idle_watts %}
<div class="row mb-3">
<div class="col-sm-10">
Consumption when inactivity power function is activated (watts)
</div>
<div class="col-sm-2">
{{ manuals.energystar.long_idle_watts }}
</div>
</div>
{% endif %}
{% if manuals.energystar.short_idle_watts %}
<div class="row mb-3">
<div class="col-sm-10">
Consumption when inactivity power function is not activated (watts)
</div>
<div class="col-sm-2">
{{ manuals.energystar.short_idle_watts }}
</div>
</div>
{% endif %}
{% if manuals.energystar.sleep_mode_watts %}
<div class="row mb-3">
<div class="col-sm-10">
sleep_mode_watts
Consumption when computer goes into sleep mode (watts)
</div>
<div class="col-sm-2">
{{ manuals.energystar.sleep_mode_watts }}
</div>
</div>
{% endif %}
{% if manuals.energystar.off_mode_watts %}
<div class="row mb-3">
<div class="col-sm-10">
Consumption when the computer is off (watts)
</div>
<div class="col-sm-2">
{{ manuals.energystar.off_mode_watts }}
</div>
</div>
{% endif %}
{% if manuals.energystar.tec_allowance_kwh %}
<div class="row mb-3">
<div class="col-sm-10">
Power allocation for normal operation (kwh)
</div>
<div class="col-sm-2">
{{ manuals.energystar.tec_allowance_kwh }}
</div>
</div>
{% endif %}
{% if manuals.energystar.tec_of_model_kwh %}
<div class="row mb-3">
<div class="col-sm-10">
Consumption of the model configuration (kwh)
</div>
<div class="col-sm-2">
{{ manuals.energystar.tec_of_model_kwh }}
</div>
</div>
{% endif %}
{% if manuals.energystar.tec_requirement_kwh %}
<div class="row mb-3">
<div class="col-sm-10">
Energy allowance provided (kwh)
</div>
<div class="col-sm-2">
{{ manuals.energystar.tec_requirement_kwh }}
</div>
</div>
{% endif %}
{% if manuals.energystar.work_off_mode_watts %}
<div class="row mb-3">
<div class="col-sm-10">
The lowest power mode which cannot be switched off (watts)
</div>
<div class="col-sm-2">
{{ manuals.energystar.work_off_mode_watts }}
</div>
</div>
{% endif %}
{% if manuals.energystar.work_weighted_power_of_model_watts %}
<div class="row mb-3">
<div class="col-sm-10">
Weighted energy consumption from all its states (watts)
</div>
<div class="col-sm-2">
{{ manuals.energystar.work_weighted_power_of_model_watts }}
</div>
</div>
{% endif %}
</div>
</div>
{% endif %}
{% if manuals.ifixit %}
<div class="row">
<div class="col-12 accordion-item">
<h5 class="card-title accordion-header">
<button class="accordion-button collapsed" data-bs-target="#manuals-repair" type="button"
data-bs-toggle="collapse" aria-expanded="false">
Repair manuals
</button>
</h5>
<div id="manuals-repair" class="row accordion-collapse collapse">
<div class="list-group col">
{% for m in manuals.ifixit %}
<div class="list-group-item d-flex align-items-center">
{% if m.image %}
<img style="max-width: 100px; margin-right: 15px;" src="{{ m.image }}" />
{% endif %}
{% if m.url %}
<a href="{{ m.url }}" target="_blank">{{ m.title }}</a><br />
{% else %}
{{ m.title }}<br />
{% endif %}
</div>
{% endfor %}
</div>
</div>
</div>
</div>
{% endif %}
<footer>
<p>
&copy;{% now 'Y' %}eReuse. All rights reserved.
</p>
</footer>
{% if user.is_anonymous and not roles %}
<div class="modal fade" id="validateModal" tabindex="-1" style="display: none;" aria-hidden="true">
<div class="modal-dialog modal-dialog-centered">
<div class="modal-content">
<div class="modal-header">
<h5 class="modal-title">Validate as <span id="title-action"></span></h5>
<button type="button" class="btn-close" data-bs-dismiss="modal" aria-label="Close"></button>
</div>
<div class="modal-body">
<a class="btn btn-primary" type="button"
href="{% url 'login:login' %}?next={{ path }}">
User of system
</a>
{% if oidc %}
<br />
<a class="btn btn-primary mt-3" type="button" href="{# url 'oidc:login_other_inventory' #}?next={{ path }}">
User of other inventory
</a>
{% endif %}
</div>
<div class="modal-footer"></div>
</div>
</div>
</div>
{% else %}
<div class="modal fade" id="rolesModal" tabindex="-1" style="display: none;" aria-hidden="true">
<div class="modal-dialog modal-dialog-centered">
<div class="modal-content">
<form action="{{ path }}" method="get">
<div class="modal-header">
<h5 class="modal-title">Select your Role <span id="title-action"></span></h5>
<button type="button" class="btn-close" data-bs-dismiss="modal" aria-label="Close"></button>
</div>
<div class="modal-body">
<select name="role">
{% for k, v in roles %}
<option value="{{ k }}" {% if v == role %}selected=selected{% endif %}>{{ v }}</option>
{% endfor %}
</select>
</div>
<div class="modal-footer">
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">Close</button>
<input type="submit" class="btn btn-primary" value="Send" />
</div>
</form>
</div>
</div>
</div>
{% endif %}
<script src="https://cdnjs.cloudflare.com/ajax/libs/bootstrap/5.1.3/js/bootstrap.bundle.min.js"></script>
</body>
</html>

3
did/tests.py Normal file
View file

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

8
did/urls.py Normal file
View file

@ -0,0 +1,8 @@
from django.urls import path
from did import views
app_name = 'did'
urlpatterns = [
path("<str:pk>", views.PublicDeviceWebView.as_view(), name="device_web"),
]

263
did/views.py Normal file
View file

@ -0,0 +1,263 @@
import json
import logging
from django.http import JsonResponse, Http404
from django.views.generic.base import TemplateView
from device.models import Device
from evidence.parse import Build
from dpp.api_dlt import ALGORITHM
from dpp.models import Proof
from dpp.api_dlt import PROOF_TYPE
from did.template_credential import dpp_tmpl
logger = logging.getLogger('django')
class PublicDeviceWebView(TemplateView):
template_name = "device_did.html"
def get(self, request, *args, **kwargs):
self.pk = kwargs['pk']
chid = self.pk.split(":")[0]
proof = Proof.objects.filter(signature=self.pk).first()
if proof:
self.object = Device(id=chid, uuid=proof.uuid)
else:
self.object = Device(id=chid)
if not self.object.last_evidence:
raise Http404
if self.request.headers.get('Accept') == 'application/json':
return self.get_json_response()
return super().get(request, *args, **kwargs)
def get_context_data(self, **kwargs):
self.context = super().get_context_data(**kwargs)
self.object.initial()
roles = [("Operator", "Operator")]
role = "Operator"
if self.request.user.is_anonymous:
roles = []
role = None
self.context.update({
'object': self.object,
'role': role,
'roles': roles,
'path': self.request.path,
'last_dpp': "",
'before_dpp': "",
})
if not self.request.user.is_anonymous:
self.get_manuals()
return self.context
@property
def public_fields(self):
return {
'id': self.object.id,
'shortid': self.object.shortid,
'uuids': self.object.uuids,
'hids': self.object.hids,
'components': self.remove_serial_number_from(self.object.components),
}
@property
def authenticated_fields(self):
return {
'serial_number': self.object.serial_number,
'components': self.object.components,
}
def remove_serial_number_from(self, components):
for component in components:
if 'serial_number' in component:
del component['SerialNumber']
return components
def get_device_data(self):
data = self.public_fields
if self.request.user.is_authenticated:
data.update(self.authenticated_fields)
return data
def get_json_response(self):
device_data = self.get_result()
# device_data = self.get_device_data()
response = JsonResponse(device_data)
response["Access-Control-Allow-Origin"] = "*"
return response
def get_result(self):
if len(self.pk.split(":")) > 1:
return self.build_from_dpp()
else:
return self.build_from_chid()
def build_from_dpp(self):
data = {
'document': {},
'dpp': self.pk,
'algorithm': ALGORITHM,
'components': [],
'manufacturer DPP': '',
'device': {},
}
dev = Build(self.object.last_evidence.doc, None, check=True)
doc = dev.get_phid()
data['document'] = json.dumps(doc)
data['device'] = dev.device
data['components'] = dev.components
self.object.get_evidences()
last_dpp = Proof.objects.filter(
uuid__in=self.object.uuids, type=PROOF_TYPE['IssueDPP']
).order_by("-timestamp").first()
key = self.pk
if last_dpp:
key = last_dpp.signature
url = "https://{}/did/{}".format(
self.request.get_host(),
key
)
data['url_last'] = url
tmpl = dpp_tmpl.copy()
tmpl["credentialSubject"]["data"] = data
return tmpl
def build_from_chid(self):
dpps = []
self.object.initial()
for d in self.object.evidences:
d.get_doc()
dev = Build(d.doc, None, check=True)
doc = dev.get_phid()
ev = json.dumps(doc)
phid = dev.get_signature(doc)
dpp = "{}:{}".format(self.pk, phid)
rr = {
'dpp': dpp,
'document': ev,
'algorithm': ALGORITHM,
'manufacturer DPP': '',
'device': dev.device,
'components': dev.components
}
tmpl = dpp_tmpl.copy()
tmpl["credentialSubject"]["data"] = rr
dpps.append(tmpl)
return {
'@context': ['https://ereuse.org/dpp0.json'],
'data': dpps,
}
def get_manuals(self):
manuals = {
'ifixit': [],
'icecat': [],
'details': {},
'laer': [],
'energystar': {},
}
try:
params = {
"manufacturer": self.object.manufacturer,
"model": self.object.model,
}
self.params = json.dumps(params)
manuals['ifixit'] = self.request_manuals('ifixit')
manuals['icecat'] = self.request_manuals('icecat')
manuals['laer'] = self.request_manuals('laer')
manuals['energystar'] = self.request_manuals('energystar') or {}
if manuals['icecat']:
manuals['details'] = manuals['icecat'][0]
except Exception as err:
logger.error("Error: {}".format(err))
self.context['manuals'] = manuals
self.parse_energystar()
def parse_energystar(self):
if not self.context.get('manuals', {}).get('energystar'):
return
# Defined in:
# https://dev.socrata.com/foundry/data.energystar.gov/j7nq-iepp
energy_types = [
'functional_adder_allowances_kwh',
'tec_allowance_kwh',
'long_idle_watts',
'short_idle_watts',
'off_mode_watts',
'sleep_mode_watts',
'tec_of_model_kwh',
'tec_requirement_kwh',
'work_off_mode_watts',
'work_weighted_power_of_model_watts',
]
energy = {}
for field in energy_types:
energy[field] = []
for e in self.context['manuals']['energystar']:
for field in energy_types:
for k, v in e.items():
if not v:
continue
if field in k:
energy[field].append(v)
for k, v in energy.items():
if not v:
energy[k] = 0
continue
tt = sum([float(i) for i in v])
energy[k] = round(tt / len(v), 2)
self.context['manuals']['energystar'] = energy
def request_manuals(self, prefix):
#TODO reimplement manuals service
response = {
"laer": [{"metal": 40, "plastic_post_consumer": 27, "plastic_post_industry": 34}],
"energystar": [{
'functional_adder_allowances_kwh': 180,
"long_idle_watts": 240,
"short_idle_watts": 120,
"sleep_mode_watts": 30,
"off_mode_watts": 3,
"tec_allowance_kwh": 180,
"tec_of_model_kwh": 150,
"tec_requirement_kwh": 220,
"work_off_mode_watts": 70,
"work_weighted_power_of_model_watts": 240
}],
"ifixit": [
{
"image": "https://guide-images.cdn.ifixit.com/igi/156EpI4YdQeVfVPa.medium",
"url": "https://es.ifixit.com/Gu%C3%ADa/HP+ProBook+450+G4+Back+Panel+Replacement/171196?lang=en",
"title": "HP ProBook 450 G4 Back Panel Replacement"
},
{
"image": "https://guide-images.cdn.ifixit.com/igi/usTIqCKpuxVWC3Ix.140x105",
"url": "https://es.ifixit.com/Gu%C3%ADa/HP+ProBook+450+G4+Display+Assembly+Replacement/171101?lang=en",
"title": "Display Assembly Replacement"
}
],
"icecat": [
{
"logo": "https://images.icecat.biz/img/brand/thumb/1_cf8603f6de7b4c4d8ac4f5f0ef439a05.jpg",
"image": "https://guide-images.cdn.ifixit.com/igi/Q2nYjTIQfG6GaI5B.standard",
"pdf": "https://icecat.biz/rest/product-pdf?productId=32951710&lang=en",
"title": "HP ProBook 450 G3"
}
]
}
return response.get(prefix, {})

View file

@ -9,6 +9,7 @@ services:
- ALLOWED_HOSTS=${ALLOWED_HOSTS:-$DOMAIN} - ALLOWED_HOSTS=${ALLOWED_HOSTS:-$DOMAIN}
- DEMO=${DEMO:-false} - DEMO=${DEMO:-false}
- PREDEFINED_TOKEN=${PREDEFINED_TOKEN:-} - PREDEFINED_TOKEN=${PREDEFINED_TOKEN:-}
- DPP=${DPP:-false}
volumes: volumes:
- .:/opt/devicehub-django - .:/opt/devicehub-django
ports: ports:

View file

@ -20,7 +20,9 @@ main() {
echo "WARNING: .env was not there, .env.example was copied, this only happens once" echo "WARNING: .env was not there, .env.example was copied, this only happens once"
fi fi
# remove old database # remove old database
sudo rm -vfr ./db/* rm -vfr ./db/*
# deactivate configured flag
rm -vfr ./already_configured
docker compose down -v docker compose down -v
docker compose build docker compose build
docker compose up ${detach_arg:-} docker compose up ${detach_arg:-}

View file

@ -7,8 +7,14 @@ RUN apt update && \
git \ git \
sqlite3 \ sqlite3 \
jq \ jq \
time \
vim \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# TODO I don't like this, but the whole ereuse-dpp works with user 1000 because of the volume mapping
# thanks https://stackoverflow.com/questions/70520205/docker-non-root-user-best-practices-for-python-images
RUN adduser --home /opt/devicehub-django -u 1000 app
WORKDIR /opt/devicehub-django WORKDIR /opt/devicehub-django
# reduce size (python specifics) -> src https://stackoverflow.com/questions/74616667/removing-pip-cache-after-installing-dependencies-in-docker-image # reduce size (python specifics) -> src https://stackoverflow.com/questions/74616667/removing-pip-cache-after-installing-dependencies-in-docker-image
@ -22,15 +28,18 @@ compile = no
no-cache-dir = True no-cache-dir = True
END END
# upgrade pip, which might fail on lxc, then remove the "corrupted file"
RUN python -m pip install --upgrade pip || (rm -rf /usr/local/lib/python3.11/site-packages/pip-*.dist-info && python -m pip install --upgrade pip)
COPY ./requirements.txt /opt/devicehub-django COPY ./requirements.txt /opt/devicehub-django
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
# TODO hardcoded, is ignored in requirements.txt
RUN pip install -i https://test.pypi.org/simple/ ereuseapitest==0.0.14
# TODO Is there a better way? # TODO Is there a better way?
# Set PYTHONPATH to include the directory with the xapian module # Set PYTHONPATH to include the directory with the xapian module
ENV PYTHONPATH="${PYTHONPATH}:/usr/lib/python3/dist-packages" ENV PYTHONPATH="${PYTHONPATH}:/usr/lib/python3/dist-packages"
COPY docker/devicehub-django.entrypoint.sh / COPY docker/devicehub-django.entrypoint.sh /
RUN chown -R app:app /opt/devicehub-django
USER app
ENTRYPOINT sh /devicehub-django.entrypoint.sh ENTRYPOINT sh /devicehub-django.entrypoint.sh

View file

@ -5,6 +5,149 @@ set -u
# DEBUG # DEBUG
set -x set -x
# TODO there is a conflict between two shared vars
# 1. from the original docker compose devicehub-teal
# 2. from the new docker compose that integrates all dpp services
wait_for_dpp_shared() {
while true; do
# specially ensure VERAMO_API_CRED_FILE is not empty,
# it takes some time to get data in
OPERATOR_TOKEN_FILE='operator-token.txt'
if [ -f "/shared/${OPERATOR_TOKEN_FILE}" ] && \
[ -f "/shared/create_user_operator_finished" ]; then
sleep 5
echo "Files ready to process."
break
else
echo "Waiting for file in shared: ${OPERATOR_TOKEN_FILE}"
sleep 5
fi
done
}
# 3. Generate an environment .env file.
# TODO cargar via shared
gen_env_vars() {
INIT_ORG="${INIT_ORG:-example-org}"
INIT_USER="${INIT_USER:-user@example.org}"
INIT_PASSWD="${INIT_PASSWD:-1234}"
ADMIN='True'
PREDEFINED_TOKEN="${PREDEFINED_TOKEN:-}"
# specific dpp env vars
if [ "${DPP:-}" = 'true' ]; then
# fill env vars in this docker entrypoint
wait_for_dpp_shared
export API_DLT='http://api_connector:3010'
export API_DLT_TOKEN="$(cat "/shared/${OPERATOR_TOKEN_FILE}")"
export API_RESOLVER='http://id_index_api:3012'
# TODO hardcoded
export ID_FEDERATED='DH1'
# propagate to .env
dpp_env_vars="$(cat <<END
API_DLT=${API_DLT}
API_DLT_TOKEN=${API_DLT_TOKEN}
API_RESOLVER=${API_RESOLVER}
ID_FEDERATED=${ID_FEDERATED}
END
)"
# generate config using env vars from docker
# TODO rethink if this is needed because now this is django, not flask
cat > .env <<END
${dpp_env_vars:-}
END
fi
}
handle_federated_id() {
# devicehub host and id federated checker
# //getAll queries are not accepted by this service, so we remove them
EXPECTED_ID_FEDERATED="$(curl -s "${API_RESOLVER%/}/getAll" \
| jq -r '.url | to_entries | .[] | select(.value == "'"${DEVICEHUB_HOST}"'") | .key' \
| head -n 1)"
# if is a new DEVICEHUB_HOST, then register it
if [ -z "${EXPECTED_ID_FEDERATED}" ]; then
# TODO better docker compose run command
cmd="docker compose run --entrypoint= devicehub flask dlt_insert_members ${DEVICEHUB_HOST}"
big_error "No FEDERATED ID maybe you should run \`${cmd}\`"
fi
# if not new DEVICEHUB_HOST, then check consistency
# if there is already an ID in the DLT, it should match with my internal ID
if [ ! "${EXPECTED_ID_FEDERATED}" = "${ID_FEDERATED}" ]; then
big_error "ID_FEDERATED should be ${EXPECTED_ID_FEDERATED} instead of ${ID_FEDERATED}"
fi
# not needed, but reserved
# EXPECTED_DEVICEHUB_HOST="$(curl -s "${API_RESOLVER%/}/getAll" \
# | jq -r '.url | to_entries | .[] | select(.key == "'"${ID_FEDERATED}"'") | .value' \
# | head -n 1)"
# if [ ! "${EXPECTED_DEVICEHUB_HOST}" = "${DEVICEHUB_HOST}" ]; then
# big_error "ERROR: DEVICEHUB_HOST should be ${EXPECTED_DEVICEHUB_HOST} instead of ${DEVICEHUB_HOST}"
# fi
}
config_dpp_part1() {
# 12. Add a new server to the 'api resolver'
if [ "${ID_SERVICE:-}" ]; then
handle_federated_id
else
# TODO when this runs more than one time per service, this is a problem, but for the docker-reset.sh workflow, that's fine
# TODO put this in already_configured
# TODO hardcoded http proto and port
./manage.py dlt_insert_members "http://${DOMAIN}:8000"
fi
# 13. Do a rsync api resolve
./manage.py dlt_rsync_members
# 14. Register a new user to the DLT
DATASET_FILE='/tmp/dataset.json'
cat > "${DATASET_FILE}" <<END
{
"email": "${INIT_USER}",
"password": "${INIT_PASSWD}",
"api_token": "${API_DLT_TOKEN}"
}
END
./manage.py dlt_register_user "${DATASET_FILE}"
}
config_phase() {
# TODO review this flag file
init_flagfile="${program_dir}/already_configured"
if [ ! -f "${init_flagfile}" ]; then
# non DL user (only for the inventory)
./manage.py add_institution "${INIT_ORG}"
# TODO: one error on add_user, and you don't add user anymore
./manage.py add_user "${INIT_ORG}" "${INIT_USER}" "${INIT_PASSWD}" "${ADMIN}" "${PREDEFINED_TOKEN}"
if [ "${DPP:-}" = 'true' ]; then
# 12, 13, 14
config_dpp_part1
# cleanup other spnapshots and copy dlt/dpp snapshots
# TODO make this better
rm example/snapshots/*
cp example/dpp-snapshots/*.json example/snapshots/
fi
# # 15. Add inventory snapshots for user "${INIT_USER}".
if [ "${DEMO:-}" = 'true' ]; then
/usr/bin/time ./manage.py up_snapshots example/snapshots/ "${INIT_USER}"
fi
# remain next command as the last operation for this if conditional
touch "${init_flagfile}"
fi
}
check_app_is_there() { check_app_is_there() {
if [ ! -f "./manage.py" ]; then if [ ! -f "./manage.py" ]; then
usage usage
@ -13,7 +156,7 @@ check_app_is_there() {
deploy() { deploy() {
# TODO this is weird, find better workaround # TODO this is weird, find better workaround
git config --global --add safe.directory /opt/devicehub-django git config --global --add safe.directory "${program_dir}"
export COMMIT=$(git log --format="%H %ad" --date=iso -n 1) export COMMIT=$(git log --format="%H %ad" --date=iso -n 1)
if [ "${DEBUG:-}" = 'true' ]; then if [ "${DEBUG:-}" = 'true' ]; then
@ -31,18 +174,7 @@ deploy() {
# inspired by https://medium.com/analytics-vidhya/django-with-docker-and-docker-compose-python-part-2-8415976470cc # inspired by https://medium.com/analytics-vidhya/django-with-docker-and-docker-compose-python-part-2-8415976470cc
echo "INFO detected NEW deployment" echo "INFO detected NEW deployment"
./manage.py migrate ./manage.py migrate
INIT_ORG="${INIT_ORG:-example-org}" config_phase
INIT_USER="${INIT_USER:-user@example.org}"
INIT_PASSWD="${INIT_PASSWD:-1234}"
ADMIN='True'
PREDEFINED_TOKEN="${PREDEFINED_TOKEN:-}"
./manage.py add_institution "${INIT_ORG}"
# TODO: one error on add_user, and you don't add user anymore
./manage.py add_user "${INIT_ORG}" "${INIT_USER}" "${INIT_PASSWD}" "${ADMIN}" "${PREDEFINED_TOKEN}"
if [ "${DEMO:-}" = 'true' ]; then
./manage.py up_snapshots example/snapshots/ "${INIT_USER}"
fi
fi fi
} }
@ -70,6 +202,7 @@ runserver() {
main() { main() {
program_dir='/opt/devicehub-django' program_dir='/opt/devicehub-django'
cd "${program_dir}" cd "${program_dir}"
gen_env_vars
deploy deploy
runserver runserver
} }

0
dpp/__init__.py Normal file
View file

3
dpp/admin.py Normal file
View file

@ -0,0 +1,3 @@
from django.contrib import admin
# Register your models here.

166
dpp/api_dlt.py Normal file
View file

@ -0,0 +1,166 @@
import json
import time
import logging
from django.conf import settings
from ereuseapi.methods import API
from dpp.models import Proof, UserDpp
logger = logging.getLogger('django')
# """The code of the status response of api dlt."""
STATUS_CODE = {
"Success": 201,
"Notwork": 400
}
ALGORITHM = "sha3-256"
PROOF_TYPE = {
'Register': 'Register',
'IssueDPP': 'IssueDPP',
'proof_of_recycling': 'proof_of_recycling',
'Erase': 'Erase',
'EWaste': 'EWaste',
}
def connect_api(user):
dp = UserDpp.objects.filter(user=user).first()
if not dp:
return
api_dlt = settings.API_DLT
token_dlt = dp.api_keys_dlt
if not api_dlt or not token_dlt:
logger.error("NOT POSSIBLE CONNECT WITH API DLT!!!")
return
return API(api_dlt, token_dlt, "ethereum")
def register_dlt(api, chid, phid, proof_type=None):
if proof_type:
return api.generate_proof(
chid,
ALGORITHM,
phid,
proof_type,
settings.ID_FEDERATED
)
return api.register_device(
chid,
ALGORITHM,
phid,
settings.ID_FEDERATED
)
def issuer_dpp_dlt(api, dpp):
phid = dpp.split(":")[1]
return api.issue_passport(
dpp,
ALGORITHM,
phid,
settings.ID_FEDERATED
)
def save_proof(signature, ev_uuid, result, proof_type, user):
if result['Status'] == STATUS_CODE.get("Success"):
timestamp = result.get('Data', {}).get('data', {}).get('timestamp')
if not timestamp:
return
logger.debug("timestamp: %s", timestamp)
d = {
"type": proof_type,
"timestamp": timestamp,
"issuer": user.institution,
"user": user,
"uuid": ev_uuid,
"signature": signature,
}
Proof.objects.create(**d)
def register_device_dlt(chid, phid, ev_uuid, user):
cny_a = 1
while cny_a:
api = connect_api(user)
if not api:
cny_a = 0
return
result = register_dlt(api, chid, phid)
try:
assert result['Status'] == STATUS_CODE.get("Success")
assert result['Data']['data']['timestamp']
cny_a = 0
except Exception:
if result.get("Data") != "Device already exists":
logger.error("API return: %s", result)
time.sleep(10)
else:
cny_a = 0
save_proof(phid, ev_uuid, result, PROOF_TYPE['Register'], user)
# TODO is neccesary?
if settings.ID_FEDERATED:
cny = 1
while cny:
try:
api.add_service(
chid,
'DeviceHub',
settings.ID_FEDERATED,
'Inventory service',
'Inv',
)
cny = 0
except Exception:
time.sleep(10)
def register_passport_dlt(chid, phid, ev_uuid, user):
token_dlt = settings.TOKEN_DLT
api_dlt = settings.API_DLT
if not token_dlt or not api_dlt:
return
dpp = "{chid}:{phid}".format(chid=chid, phid=phid)
if Proof.objects.filter(signature=dpp, type=PROOF_TYPE['IssueDPP']).exists():
return
cny_a = 1
while cny_a:
try:
api = connect_api(user)
if not api:
cny_a = 0
return
result = issuer_dpp_dlt(api, dpp)
cny_a = 0
except Exception as err:
logger.error("ERROR API issue passport return: %s", err)
time.sleep(10)
if result['Status'] is not STATUS_CODE.get("Success"):
logger.error("ERROR API issue passport return: %s", result)
return
save_proof(phid, ev_uuid, result, PROOF_TYPE['IssueDPP'], user)

6
dpp/apps.py Normal file
View file

@ -0,0 +1,6 @@
from django.apps import AppConfig
class DppConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "dpp"

View file

@ -0,0 +1,35 @@
import logging
import requests
from django.core.management.base import BaseCommand
from django.conf import settings
from user.models import Institution
logger = logging.getLogger('django')
class Command(BaseCommand):
help = "Insert a new Institution in DLT"
def add_arguments(self, parser):
parser.add_argument('domain', type=str, help='institution')
def handle(self, *args, **kwargs):
domain = kwargs.get("domain")
api = settings.API_RESOLVER
if not api:
logger.error("you need set the var API_RESOLVER")
return
if "http" not in domain:
logger.error("you need put https:// in %s", domain)
return
api = api.strip("/")
domain = domain.strip("/")
data = {"url": domain}
url = api + '/registerURL'
res = requests.post(url, json=data)
print(res.json())

View file

@ -0,0 +1,72 @@
import json
import logging
from ereuseapi.methods import API
from django.conf import settings
from django.core.management.base import BaseCommand
from user.models import User, Institution
from dpp.models import UserDpp
logger = logging.getLogger('django')
class Command(BaseCommand):
help = "Insert users than are in Dlt with params: path of data set file"
def add_arguments(self, parser):
parser.add_argument('dataset_file', type=str, help='institution')
def handle(self, *args, **kwargs):
dataset_file = kwargs.get("dataset_file")
self.api_dlt = settings.API_DLT
self.institution = Institution.objects.filter().first()
if not self.api_dlt:
logger.error("you need set the var API_DLT")
return
self.api_dlt = self.api_dlt.strip("/")
with open(dataset_file) as f:
dataset = json.loads(f.read())
self.add_user(dataset)
def add_user(self, data):
email = data.get("email")
password = data.get("password")
api_token = data.get("api_token")
# ethereum = {"data": {"api_token": api_token}}
# data_eth = json.dumps(ethereum)
data_eth = json.dumps(api_token)
# TODO encrypt in the future
# api_keys_dlt = encrypt(password, data_eth)
api_keys_dlt = data_eth.strip('"').strip("'")
user = User.objects.filter(email=email).first()
if not user:
user = User.objects.create(
email=email,
password=password,
institution = self.institution
)
roles = []
token_dlt = api_token
api = API(self.api_dlt, token_dlt, "ethereum")
result = api.check_user_roles()
if result.get('Status') == 200:
if 'Success' in result.get('Data', {}).get('status'):
rols = result.get('Data', {}).get('data', {})
roles = [(k, k) for k, v in rols.items() if v]
roles_dlt = json.dumps(roles)
UserDpp.objects.create(
roles_dlt=roles_dlt,
api_keys_dlt=api_keys_dlt,
user=user
)

View file

@ -0,0 +1,47 @@
import logging
import requests
from django.core.management.base import BaseCommand
from django.conf import settings
from dpp.models import MemberFederated
logger = logging.getLogger('django')
class Command(BaseCommand):
help = "Synchronize members of DLT"
def handle(self, *args, **kwargs):
api = settings.API_RESOLVER
if not api:
logger.error("you need set the var API_RESOLVER")
return
api = api.strip("/")
url = api + '/getAll'
res = requests.get(url)
if res.status_code != 200:
return "Error, {}".format(res.text)
response = res.json()
members = response['url']
counter = members.pop('counter')
if counter <= MemberFederated.objects.count():
logger.info("Synchronize members of DLT -> All Ok")
return "All ok"
for k, v in members.items():
id = self.clean_id(k)
member = MemberFederated.objects.filter(dlt_id_provider=id).first()
if member:
if member.domain != v:
member.domain = v
member.save()
continue
MemberFederated.objects.create(dlt_id_provider=id, domain=v)
return res.text
def clean_id(self, id):
return int(id.split('DH')[-1])

View file

@ -0,0 +1,52 @@
# Generated by Django 5.0.6 on 2024-11-18 14:29
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
("user", "0001_initial"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name="Proof",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("timestamp", models.IntegerField()),
("uuid", models.UUIDField()),
("signature", models.CharField(max_length=256)),
("type", models.CharField(max_length=256)),
(
"issuer",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="user.institution",
),
),
(
"user",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to=settings.AUTH_USER_MODEL,
),
),
],
),
]

View file

@ -0,0 +1,25 @@
# Generated by Django 5.0.6 on 2024-11-19 19:18
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("dpp", "0001_initial"),
]
operations = [
migrations.CreateModel(
name="MemberFederated",
fields=[
(
"dlt_id_provider",
models.IntegerField(primary_key=True, serialize=False),
),
("domain", models.CharField(max_length=256)),
("client_id", models.CharField(max_length=256)),
("client_secret", models.CharField(max_length=256)),
],
),
]

View file

@ -0,0 +1,60 @@
# Generated by Django 5.0.6 on 2024-11-20 10:51
import django.db.models.deletion
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("dpp", "0002_memberfederated"),
("user", "0001_initial"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AddField(
model_name="memberfederated",
name="institution",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="user.institution",
),
),
migrations.AlterField(
model_name="memberfederated",
name="client_id",
field=models.CharField(max_length=256, null=True),
),
migrations.AlterField(
model_name="memberfederated",
name="client_secret",
field=models.CharField(max_length=256, null=True),
),
migrations.CreateModel(
name="UserDpp",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
("roles_dlt", models.TextField()),
("api_keys_dlt", models.TextField()),
(
"user",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
),
),
],
),
]

View file

32
dpp/models.py Normal file
View file

@ -0,0 +1,32 @@
from django.db import models
from user.models import User, Institution
from utils.constants import STR_EXTEND_SIZE
# Create your models here.
class Proof(models.Model):
## The signature can be a phid or dpp depending of type of Proof
timestamp = models.IntegerField()
uuid = models.UUIDField()
signature = models.CharField(max_length=STR_EXTEND_SIZE)
type = models.CharField(max_length=STR_EXTEND_SIZE)
issuer = models.ForeignKey(Institution, on_delete=models.CASCADE)
user = models.ForeignKey(
User, on_delete=models.SET_NULL, null=True, blank=True)
class MemberFederated(models.Model):
dlt_id_provider = models.IntegerField(primary_key=True)
domain = models.CharField(max_length=STR_EXTEND_SIZE)
# This client_id and client_secret is used for connected to this domain as
# a client and this domain then is the server of auth
client_id = models.CharField(max_length=STR_EXTEND_SIZE, null=True)
client_secret = models.CharField(max_length=STR_EXTEND_SIZE, null=True)
institution = models.ForeignKey(
Institution, on_delete=models.SET_NULL, null=True, blank=True)
class UserDpp(models.Model):
roles_dlt = models.TextField()
api_keys_dlt = models.TextField()
user = models.ForeignKey(User, on_delete=models.CASCADE)

3
dpp/tests.py Normal file
View file

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

8
dpp/urls.py Normal file
View file

@ -0,0 +1,8 @@
from django.urls import path
from dpp import views
app_name = 'dpp'
urlpatterns = [
path("<int:proof_id>/", views.ProofView.as_view(), name="proof"),
]

40
dpp/views.py Normal file
View file

@ -0,0 +1,40 @@
import json
import logging
import hashlib
from django.views.generic.edit import View
from django.http import JsonResponse
from dpp.api_dlt import ALGORITHM
from evidence.models import Evidence
from evidence.parse import Build
from dpp.models import Proof
class ProofView(View):
def get(self, request, *args, **kwargs):
timestamp = kwargs.get("proof_id")
proof = Proof.objects.filter(timestamp=timestamp).first()
if not proof:
return JsonResponse({}, status=404)
ev = Evidence(proof.uuid)
if not ev.doc:
return JsonResponse({}, status=404)
dev = Build(ev.doc, None, check=True)
doc = dev.get_phid()
data = {
"algorithm": ALGORITHM,
"document": json.dumps(doc)
}
d = {
'@context': ['https://ereuse.org/proof0.json'],
'data': data,
}
response = JsonResponse(d, status=200)
response["Access-Control-Allow-Origin"] = "*"
return response

View file

@ -29,17 +29,17 @@ class UploadForm(forms.Form):
try: try:
file_json = json.loads(file_data) file_json = json.loads(file_data)
Build(file_json, None, check=True) snap = Build(file_json, None, check=True)
exist_annotation = Annotation.objects.filter( exist_annotation = Annotation.objects.filter(
uuid=file_json['uuid'] uuid=snap.uuid
).first() ).first()
if exist_annotation: if exist_annotation:
raise ValidationError( raise ValidationError(
_("The snapshot already exists"), _("The snapshot already exists"),
code="duplicate_snapshot", code="duplicate_snapshot",
) )
#Catch any error and display it as Validation Error so the Form handles it #Catch any error and display it as Validation Error so the Form handles it
except Exception as e: except Exception as e:
raise ValidationError( raise ValidationError(
@ -221,7 +221,7 @@ class EraseServerForm(forms.Form):
if self.instance: if self.instance:
return return
Annotation.objects.create( Annotation.objects.create(
uuid=self.uuid, uuid=self.uuid,
type=Annotation.Type.ERASE_SERVER, type=Annotation.Type.ERASE_SERVER,

View file

@ -1,11 +1,12 @@
import json import json
import hashlib
from dmidecode import DMIParse from dmidecode import DMIParse
from django.db import models from django.db import models
from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH
from evidence.xapian import search from evidence.xapian import search
from evidence.parse_details import ParseSnapshot from evidence.parse_details import ParseSnapshot, get_inxi, get_inxi_key
from user.models import User, Institution from user.models import User, Institution
@ -39,6 +40,7 @@ class Evidence:
self.doc = None self.doc = None
self.created = None self.created = None
self.dmi = None self.dmi = None
self.inxi = None
self.annotations = [] self.annotations = []
self.components = [] self.components = []
self.default = "n/a" self.default = "n/a"
@ -58,10 +60,19 @@ class Evidence:
if a: if a:
self.owner = a.owner self.owner = a.owner
def get_phid(self):
if not self.doc:
self.get_doc()
return hashlib.sha3_256(json.dumps(self.doc)).hexdigest()
def get_doc(self): def get_doc(self):
self.doc = {} self.doc = {}
self.inxi = None
if not self.owner: if not self.owner:
self.get_owner() self.get_owner()
qry = 'uuid:"{}"'.format(self.uuid) qry = 'uuid:"{}"'.format(self.uuid)
matches = search(self.owner, qry, limit=1) matches = search(self.owner, qry, limit=1)
if matches and matches.size() < 0: if matches and matches.size() < 0:
@ -70,9 +81,36 @@ class Evidence:
for xa in matches: for xa in matches:
self.doc = json.loads(xa.document.get_data()) self.doc = json.loads(xa.document.get_data())
if not self.is_legacy(): if self.is_legacy():
return
if self.doc.get("credentialSubject"):
for ev in self.doc["evidence"]:
if "dmidecode" == ev.get("operation"):
dmidecode_raw = ev["output"]
if "inxi" == ev.get("operation"):
self.inxi = ev["output"]
else:
dmidecode_raw = self.doc["data"]["dmidecode"] dmidecode_raw = self.doc["data"]["dmidecode"]
inxi_raw = self.doc["data"]["inxi"]
self.dmi = DMIParse(dmidecode_raw) self.dmi = DMIParse(dmidecode_raw)
try:
self.inxi = json.loads(inxi_raw)
except Exception:
pass
if self.inxi:
try:
machine = get_inxi_key(self.inxi, 'Machine')
for m in machine:
system = get_inxi(m, "System")
if system:
self.device_manufacturer = system
self.device_model = get_inxi(m, "product")
self.device_serial_number = get_inxi(m, "serial")
self.device_chassis = get_inxi(m, "Type")
self.device_version = get_inxi(m, "v")
except Exception:
return
def get_time(self): def get_time(self):
if not self.doc: if not self.doc:
@ -98,6 +136,9 @@ class Evidence:
if self.is_legacy(): if self.is_legacy():
return self.doc['device']['manufacturer'] return self.doc['device']['manufacturer']
if self.inxi:
return self.device_manufacturer
return self.dmi.manufacturer().strip() return self.dmi.manufacturer().strip()
def get_model(self): def get_model(self):
@ -110,12 +151,18 @@ class Evidence:
if self.is_legacy(): if self.is_legacy():
return self.doc['device']['model'] return self.doc['device']['model']
if self.inxi:
return self.device_model
return self.dmi.model().strip() return self.dmi.model().strip()
def get_chassis(self): def get_chassis(self):
if self.is_legacy(): if self.is_legacy():
return self.doc['device']['model'] return self.doc['device']['model']
if self.inxi:
return self.device_chassis
chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual') chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual')
lower_type = chassis.lower() lower_type = chassis.lower()
@ -127,8 +174,18 @@ class Evidence:
def get_serial_number(self): def get_serial_number(self):
if self.is_legacy(): if self.is_legacy():
return self.doc['device']['serialNumber'] return self.doc['device']['serialNumber']
if self.inxi:
return self.device_serial_number
return self.dmi.serial_number().strip() return self.dmi.serial_number().strip()
def get_version(self):
if self.inxi:
return self.device_version
return ""
@classmethod @classmethod
def get_all(cls, user): def get_all(cls, user):
return Annotation.objects.filter( return Annotation.objects.filter(
@ -142,6 +199,9 @@ class Evidence:
self.components = snapshot['components'] self.components = snapshot['components']
def is_legacy(self): def is_legacy(self):
if self.doc.get("credentialSubject"):
return False
return self.doc.get("software") != "workbench-script" return self.doc.get("software") != "workbench-script"
def is_web_snapshot(self): def is_web_snapshot(self):

View file

@ -3,43 +3,48 @@ import hashlib
import logging import logging
from dmidecode import DMIParse from dmidecode import DMIParse
from json_repair import repair_json from evidence.parse_details import ParseSnapshot
from evidence.parse_details import get_lshw_child
from evidence.models import Annotation from evidence.models import Annotation
from evidence.xapian import index from evidence.xapian import index
from utils.constants import CHASSIS_DH from evidence.parse_details import get_inxi_key, get_inxi
from django.conf import settings
if settings.DPP:
from dpp.api_dlt import register_device_dlt, register_passport_dlt
logger = logging.getLogger('django') logger = logging.getLogger('django')
def get_mac(lshw):
try:
if type(lshw) is dict:
hw = lshw
else:
hw = json.loads(lshw)
except json.decoder.JSONDecodeError:
hw = json.loads(repair_json(lshw))
nets = [] def get_mac(inxi):
get_lshw_child(hw, nets, 'network') nets = get_inxi_key(inxi, "Network")
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
nets_sorted = sorted(nets, key=lambda x: x['businfo'])
if nets_sorted:
mac = nets_sorted[0]['serial']
logger.debug("The snapshot has the following MAC: %s" , mac)
return mac
for n, iface in networks:
if get_inxi(n, "port"):
return get_inxi(iface, 'mac')
class Build: class Build:
def __init__(self, evidence_json, user, check=False): def __init__(self, evidence_json, user, check=False):
self.json = evidence_json self.evidence = evidence_json.copy()
self.json = evidence_json.copy()
if evidence_json.get("credentialSubject"):
self.json.update(evidence_json["credentialSubject"])
if evidence_json.get("evidence"):
self.json["data"] = {}
for ev in evidence_json["evidence"]:
k = ev.get("operation")
if not k:
continue
self.json["data"][k] = ev.get("output")
self.uuid = self.json['uuid'] self.uuid = self.json['uuid']
self.user = user self.user = user
self.hid = None self.hid = None
self.chid = None
self.phid = self.get_signature(self.json)
self.generate_chids() self.generate_chids()
if check: if check:
@ -47,14 +52,17 @@ class Build:
self.index() self.index()
self.create_annotations() self.create_annotations()
if settings.DPP:
self.register_device_dlt()
def index(self): def index(self):
snap = json.dumps(self.json) snap = json.dumps(self.evidence)
index(self.user.institution, self.uuid, snap) index(self.user.institution, self.uuid, snap)
def generate_chids(self): def generate_chids(self):
self.algorithms = { self.algorithms = {
'hidalgo1': self.get_hid_14(), 'hidalgo1': self.get_hid_14(),
'legacy_dpp': self.get_chid_dpp(),
} }
def get_hid_14(self): def get_hid_14(self):
@ -69,8 +77,51 @@ class Build:
sku = device.get("sku", '') sku = device.get("sku", '')
hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}" hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}"
self.chid = hashlib.sha3_256(hid.encode()).hexdigest()
return self.chid
return hashlib.sha3_256(hid.encode()).hexdigest() def get_chid_dpp(self):
if self.json.get("software") == "workbench-script":
device = ParseSnapshot(self.json).device
else:
device = self.json['device']
hid = self.get_id_hw_dpp(device)
self.chid = hashlib.sha3_256(hid.encode("utf-8")).hexdigest()
return self.chid
def get_id_hw_dpp(self, d):
manufacturer = d.get("manufacturer", '')
model = d.get("model", '')
chassis = d.get("chassis", '')
serial_number = d.get("serialNumber", '')
sku = d.get("sku", '')
typ = d.get("type", '')
version = d.get("version", '')
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{typ}{version}"
def get_phid(self):
if self.json.get("software") == "workbench-script":
data = ParseSnapshot(self.json)
self.device = data.device
self.components = data.components
else:
self.device = self.json.get("device")
self.components = self.json.get("components", [])
self.device.pop("actions", None)
for c in self.components:
c.pop("actions", None)
device = self.get_id_hw_dpp(self.device)
components = sorted(self.components, key=lambda x: x.get("type"))
doc = [("computer", device)]
for c in components:
doc.append((c.get("type"), self.get_id_hw_dpp(c)))
return doc
def create_annotations(self): def create_annotations(self):
annotation = Annotation.objects.filter( annotation = Annotation.objects.filter(
@ -94,38 +145,39 @@ class Build:
value=v value=v
) )
def get_chassis_dh(self):
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", "n/a").strip()
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
def get_hid(self, snapshot): def get_hid(self, snapshot):
dmidecode_raw = snapshot["data"]["dmidecode"] try:
self.dmi = DMIParse(dmidecode_raw) self.inxi = self.json["data"]["inxi"]
if isinstance(self.inxi, str):
self.inxi = json.loads(self.inxi)
except Exception:
logger.error("No inxi in snapshot %s", self.uuid)
return ""
manufacturer = self.dmi.manufacturer().strip() machine = get_inxi_key(self.inxi, 'Machine')
model = self.dmi.model().strip() for m in machine:
chassis = self.get_chassis_dh() system = get_inxi(m, "System")
serial_number = self.dmi.serial_number() if system:
sku = self.get_sku() manufacturer = system
model = get_inxi(m, "product")
serial_number = get_inxi(m, "serial")
chassis = get_inxi(m, "Type")
else:
sku = get_inxi(m, "part-nu")
if not snapshot["data"].get('lshw'): mac = get_mac(self.inxi) or ""
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
lshw = snapshot["data"]["lshw"]
# mac = get_mac2(hwinfo_raw) or ""
mac = get_mac(lshw) or ""
if not mac: if not mac:
txt = "Could not retrieve MAC address in snapshot %s" txt = "Could not retrieve MAC address in snapshot %s"
logger.warning(txt, snapshot['uuid']) logger.warning(txt, snapshot['uuid'])
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}" return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}"
def get_signature(self, doc):
return hashlib.sha3_256(json.dumps(doc).encode()).hexdigest()
def register_device_dlt(self):
chid = self.algorithms.get('legacy_dpp')
phid = self.get_signature(self.get_phid())
register_device_dlt(chid, phid, self.uuid, self.user)
register_passport_dlt(chid, phid, self.uuid, self.user)

View file

@ -1,10 +1,10 @@
import re
import json import json
import logging import logging
import numpy as np import numpy as np
from datetime import datetime from datetime import datetime
from dmidecode import DMIParse from dmidecode import DMIParse
from json_repair import repair_json
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
@ -12,322 +12,345 @@ from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
logger = logging.getLogger('django') logger = logging.getLogger('django')
def get_lshw_child(child, nets, component): def get_inxi_key(inxi, component):
if child.get('id') == component: for n in inxi:
nets.append(child) for k, v in n.items():
if child.get('children'): if component in k:
[get_lshw_child(x, nets, component) for x in child['children']] return v
def get_inxi(n, name):
for k, v in n.items():
if f"#{name}" in k:
return v
return ""
class ParseSnapshot: class ParseSnapshot:
def __init__(self, snapshot, default="n/a"): def __init__(self, snapshot, default="n/a"):
self.default = default self.default = default
self.dmidecode_raw = snapshot["data"].get("dmidecode", "{}") self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}")
self.smart_raw = snapshot["data"].get("disks", []) self.smart_raw = snapshot.get("data", {}).get("smartctl", [])
self.hwinfo_raw = snapshot["data"].get("hwinfo", "") self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or ""
self.lshw_raw = snapshot["data"].get("lshw", {}) or {} for ev in snapshot.get("evidence", []):
self.lscpi_raw = snapshot["data"].get("lspci", "") if "dmidecode" == ev.get("operation"):
self.dmidecode_raw = ev["output"]
if "inxi" == ev.get("operation"):
self.inxi_raw = ev["output"]
if "smartctl" == ev.get("operation"):
self.smart_raw = ev["output"]
data = snapshot
if snapshot.get("credentialSubject"):
data = snapshot["credentialSubject"]
self.device = {"actions": []} self.device = {"actions": []}
self.components = [] self.components = []
self.monitors = []
self.dmi = DMIParse(self.dmidecode_raw) self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw) self.smart = self.loads(self.smart_raw)
self.lshw = self.loads(self.lshw_raw) self.inxi = self.loads(self.inxi_raw)
self.hwinfo = self.parse_hwinfo()
self.set_computer() self.set_computer()
self.get_hwinfo_monitors()
self.set_components() self.set_components()
self.snapshot_json = { self.snapshot_json = {
"type": "Snapshot", "type": "Snapshot",
"device": self.device, "device": self.device,
"software": snapshot["software"], "software": data["software"],
"components": self.components, "components": self.components,
"uuid": snapshot['uuid'], "uuid": data['uuid'],
"version": snapshot['version'], "endTime": data["timestamp"],
"endTime": snapshot["timestamp"],
"elapsed": 1, "elapsed": 1,
} }
def set_computer(self): def set_computer(self):
self.device['manufacturer'] = self.dmi.manufacturer().strip() machine = get_inxi_key(self.inxi, 'Machine') or []
self.device['model'] = self.dmi.model().strip() for m in machine:
self.device['serialNumber'] = self.dmi.serial_number() system = get_inxi(m, "System")
self.device['type'] = self.get_type() if system:
self.device['sku'] = self.get_sku() self.device['manufacturer'] = system
self.device['version'] = self.get_version() self.device['model'] = get_inxi(m, "product")
self.device['system_uuid'] = self.get_uuid() self.device['serialNumber'] = get_inxi(m, "serial")
self.device['family'] = self.get_family() self.device['type'] = get_inxi(m, "Type")
self.device['chassis'] = self.get_chassis_dh() self.device['chassis'] = self.device['type']
self.device['version'] = get_inxi(m, "v")
else:
self.device['system_uuid'] = get_inxi(m, "uuid")
self.device['sku'] = get_inxi(m, "part-nu")
def set_components(self): def set_components(self):
self.get_mother_board()
self.get_cpu() self.get_cpu()
self.get_ram() self.get_ram()
self.get_mother_board()
self.get_graphic() self.get_graphic()
self.get_data_storage()
self.get_display() self.get_display()
self.get_sound_card()
self.get_networks() self.get_networks()
self.get_sound_card()
def get_cpu(self): self.get_data_storage()
for cpu in self.dmi.get('Processor'): self.get_battery()
serial = cpu.get('Serial Number')
if serial == 'Not Specified' or not serial:
serial = cpu.get('ID').replace(' ', '')
self.components.append(
{
"actions": [],
"type": "Processor",
"speed": self.get_cpu_speed(cpu),
"cores": int(cpu.get('Core Count', 1)),
"model": cpu.get('Version'),
"threads": int(cpu.get('Thread Count', 1)),
"manufacturer": cpu.get('Manufacturer'),
"serialNumber": serial,
"brand": cpu.get('Family'),
"address": self.get_cpu_address(cpu),
"bogomips": self.get_bogomips(),
}
)
def get_ram(self):
for ram in self.dmi.get("Memory Device"):
if ram.get('size') == 'No Module Installed':
continue
if not ram.get("Speed"):
continue
self.components.append(
{
"actions": [],
"type": "RamModule",
"size": self.get_ram_size(ram),
"speed": self.get_ram_speed(ram),
"manufacturer": ram.get("Manufacturer", self.default),
"serialNumber": ram.get("Serial Number", self.default),
"interface": ram.get("Type", "DDR"),
"format": ram.get("Form Factor", "DIMM"),
"model": ram.get("Part Number", self.default),
}
)
def get_mother_board(self): def get_mother_board(self):
for moder_board in self.dmi.get("Baseboard"): machine = get_inxi_key(self.inxi, 'Machine') or []
self.components.append( mb = {"type": "Motherboard",}
{ for m in machine:
"actions": [], bios_date = get_inxi(m, "date")
"type": "Motherboard", if not bios_date:
"version": moder_board.get("Version"), continue
"serialNumber": moder_board.get("Serial Number", "").strip(), mb["manufacturer"] = get_inxi(m, "Mobo")
"manufacturer": moder_board.get("Manufacturer", "").strip(), mb["model"] = get_inxi(m, "model")
"biosDate": self.get_bios_date(), mb["serialNumber"] = get_inxi(m, "serial")
"ramMaxSize": self.get_max_ram_size(), mb["version"] = get_inxi(m, "v")
"ramSlots": len(self.dmi.get("Memory Device")), mb["biosDate"] = bios_date
"slots": self.get_ram_slots(), mb["biosVersion"] = self.get_bios_version()
"model": moder_board.get("Product Name", "").strip(), mb["firewire"]: self.get_firmware_num()
"firewire": self.get_firmware_num(), mb["pcmcia"]: self.get_pcmcia_num()
"pcmcia": self.get_pcmcia_num(), mb["serial"]: self.get_serial_num()
"serial": self.get_serial_num(), mb["usb"]: self.get_usb_num()
"usb": self.get_usb_num(),
} self.get_ram_slots(mb)
)
self.components.append(mb)
def get_ram_slots(self, mb):
memory = get_inxi_key(self.inxi, 'Memory') or []
for m in memory:
slots = get_inxi(m, "slots")
if not slots:
continue
mb["slots"] = slots
mb["ramSlots"] = get_inxi(m, "modules")
mb["ramMaxSize"] = get_inxi(m, "capacity")
def get_cpu(self):
cpu = get_inxi_key(self.inxi, 'CPU') or []
cp = {"type": "Processor"}
vulnerabilities = []
for c in cpu:
base = get_inxi(c, "model")
if base:
cp["model"] = get_inxi(c, "model")
cp["arch"] = get_inxi(c, "arch")
cp["bits"] = get_inxi(c, "bits")
cp["gen"] = get_inxi(c, "gen")
cp["family"] = get_inxi(c, "family")
cp["date"] = get_inxi(c, "built")
continue
des = get_inxi(c, "L1")
if des:
cp["L1"] = des
cp["L2"] = get_inxi(c, "L2")
cp["L3"] = get_inxi(c, "L3")
cp["cpus"] = get_inxi(c, "cpus")
cp["cores"] = get_inxi(c, "cores")
cp["threads"] = get_inxi(c, "threads")
continue
bogo = get_inxi(c, "bogomips")
if bogo:
cp["bogomips"] = bogo
cp["base/boost"] = get_inxi(c, "base/boost")
cp["min/max"] = get_inxi(c, "min/max")
cp["ext-clock"] = get_inxi(c, "ext-clock")
cp["volts"] = get_inxi(c, "volts")
continue
ctype = get_inxi(c, "Type")
if ctype:
v = {"Type": ctype}
status = get_inxi(c, "status")
if status:
v["status"] = status
mitigation = get_inxi(c, "mitigation")
if mitigation:
v["mitigation"] = mitigation
vulnerabilities.append(v)
self.components.append(cp)
def get_ram(self):
memory = get_inxi_key(self.inxi, 'Memory') or []
mem = {"type": "RamModule"}
for m in memory:
base = get_inxi(m, "System RAM")
if base:
mem["size"] = get_inxi(m, "total")
slot = get_inxi(m, "manufacturer")
if slot:
mem["manufacturer"] = slot
mem["model"] = get_inxi(m, "part-no")
mem["serialNumber"] = get_inxi(m, "serial")
mem["speed"] = get_inxi(m, "speed")
mem["bits"] = get_inxi(m, "data")
mem["interface"] = get_inxi(m, "type")
module = get_inxi(m, "modules")
if module:
mem["modules"] = module
self.components.append(mem)
def get_graphic(self): def get_graphic(self):
displays = [] graphics = get_inxi_key(self.inxi, 'Graphics') or []
get_lshw_child(self.lshw, displays, 'display')
for c in graphics:
for c in displays: if not get_inxi(c, "Device") or not get_inxi(c, "vendor"):
if not c['configuration'].get('driver', None):
continue continue
self.components.append( self.components.append(
{ {
"actions": [],
"type": "GraphicCard", "type": "GraphicCard",
"memory": self.get_memory_video(c), "memory": self.get_memory_video(c),
"manufacturer": c.get("vendor", self.default), "manufacturer": get_inxi(c, "vendor"),
"model": c.get("product", self.default), "model": get_inxi(c, "Device"),
"serialNumber": c.get("serial", self.default), "arch": get_inxi(c, "arch"),
"serialNumber": get_inxi(c, "serial"),
"integrated": True if get_inxi(c, "port") else False
}
)
def get_battery(self):
bats = get_inxi_key(self.inxi, 'Battery') or []
for b in bats:
self.components.append(
{
"type": "Battery",
"model": get_inxi(b, "model"),
"serialNumber": get_inxi(b, "serial"),
"condition": get_inxi(b, "condition"),
"cycles": get_inxi(b, "cycles"),
"volts": get_inxi(b, "volts")
} }
) )
def get_memory_video(self, c): def get_memory_video(self, c):
# get info of lspci memory = get_inxi_key(self.inxi, 'Memory') or []
# pci_id = c['businfo'].split('@')[1]
# lspci.get(pci_id) | grep size for m in memory:
# lspci -v -s 00:02.0 igpu = get_inxi(m, "igpu")
return None agpu = get_inxi(m, "agpu")
ngpu = get_inxi(m, "ngpu")
gpu = get_inxi(m, "gpu")
if igpu or agpu or gpu or ngpu:
return igpu or agpu or gpu or ngpu
return self.default
def get_data_storage(self): def get_data_storage(self):
for sm in self.smart: hdds= get_inxi_key(self.inxi, 'Drives') or []
if sm.get('smartctl', {}).get('exit_status') == 1: for d in hdds:
usb = get_inxi(d, "type")
if usb == "USB":
continue continue
model = sm.get('model_name')
manufacturer = None
hours = sm.get("power_on_time", {}).get("hours", 0)
if model and len(model.split(" ")) > 1:
mm = model.split(" ")
model = mm[-1]
manufacturer = " ".join(mm[:-1])
self.components.append( serial = get_inxi(d, "serial")
{ if serial:
"actions": self.sanitize(sm), hd = {
"type": self.get_data_storage_type(sm), "type": "Storage",
"model": model, "manufacturer": get_inxi(d, "vendor"),
"manufacturer": manufacturer, "model": get_inxi(d, "model"),
"serialNumber": sm.get('serial_number'), "serialNumber": get_inxi(d, "serial"),
"size": self.get_data_storage_size(sm), "size": get_inxi(d, "size"),
"variant": sm.get("firmware_version"), "speed": get_inxi(d, "speed"),
"interface": self.get_data_storage_interface(sm), "interface": get_inxi(d, "tech"),
"hours": hours, "firmware": get_inxi(d, "fw-rev")
} }
) rpm = get_inxi(d, "rpm")
if rpm:
hd["rpm"] = rpm
family = get_inxi(d, "family")
if family:
hd["family"] = family
sata = get_inxi(d, "sata")
if sata:
hd["sata"] = sata
continue
cycles = get_inxi(d, "cycles")
if cycles:
hd['cycles'] = cycles
hd["health"] = get_inxi(d, "health")
hd["time of used"] = get_inxi(d, "on")
hd["read used"] = get_inxi(d, "read-units")
hd["written used"] = get_inxi(d, "written-units")
self.components.append(hd)
continue
hd = {}
def sanitize(self, action): def sanitize(self, action):
return [] return []
def get_bogomips(self):
if not self.hwinfo:
return self.default
bogomips = 0
for row in self.hwinfo:
for cel in row:
if 'BogoMips' in cel:
try:
bogomips += float(cel.split(":")[-1])
except:
pass
return bogomips
def get_networks(self): def get_networks(self):
networks = [] nets = get_inxi_key(self.inxi, "Network") or []
get_lshw_child(self.lshw, networks, 'network') networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
for c in networks: for n, iface in networks:
capacity = c.get('capacity') model = get_inxi(n, "Device")
wireless = bool(c.get('configuration', {}).get('wireless', False)) if not model:
continue
interface = ''
for k in n.keys():
if "port" in k:
interface = "Integrated"
if "pcie" in k:
interface = "PciExpress"
if get_inxi(n, "type") == "USB":
interface = "USB"
self.components.append( self.components.append(
{ {
"actions": [],
"type": "NetworkAdapter", "type": "NetworkAdapter",
"model": c.get('product'), "model": model,
"manufacturer": c.get('vendor'), "manufacturer": get_inxi(n, 'vendor'),
"serialNumber": c.get('serial'), "serialNumber": get_inxi(iface, 'mac'),
"speed": capacity, "speed": get_inxi(n, "speed"),
"variant": c.get('version', 1), "interface": interface,
"wireless": wireless or False,
"integrated": "PCI:0000:00" in c.get("businfo", ""),
} }
) )
def get_sound_card(self): def get_sound_card(self):
multimedias = [] audio = get_inxi_key(self.inxi, "Audio") or []
get_lshw_child(self.lshw, multimedias, 'multimedia')
for c in audio:
for c in multimedias: model = get_inxi(c, "Device")
if not model:
continue
self.components.append( self.components.append(
{ {
"actions": [],
"type": "SoundCard", "type": "SoundCard",
"model": c.get('product'), "model": model,
"manufacturer": c.get('vendor'), "manufacturer": get_inxi(c, 'vendor'),
"serialNumber": c.get('serial'), "serialNumber": get_inxi(c, 'serial'),
} }
) )
def get_display(self): # noqa: C901 def get_display(self):
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED' graphics = get_inxi_key(self.inxi, "Graphics") or []
for c in graphics:
for c in self.monitors: if not get_inxi(c, "Monitor"):
resolution_width, resolution_height = (None,) * 2 continue
refresh, serial, model, manufacturer, size = (None,) * 5
year, week, production_date = (None,) * 3
for x in c:
if "Vendor: " in x:
manufacturer = x.split('Vendor: ')[-1].strip()
if "Model: " in x:
model = x.split('Model: ')[-1].strip()
if "Serial ID: " in x:
serial = x.split('Serial ID: ')[-1].strip()
if " Resolution: " in x:
rs = x.split(' Resolution: ')[-1].strip()
if 'x' in rs:
resolution_width, resolution_height = [
int(r) for r in rs.split('x')
]
if "Frequencies: " in x:
try:
refresh = int(float(x.split(',')[-1].strip()[:-3]))
except Exception:
pass
if 'Year of Manufacture' in x:
year = x.split(': ')[1]
if 'Week of Manufacture' in x:
week = x.split(': ')[1]
if "Size: " in x:
size = self.get_size_monitor(x)
technology = next((t for t in TECHS if t in c[0]), None)
if year and week:
d = '{} {} 0'.format(year, week)
production_date = datetime.strptime(d, '%Y %W %w').isoformat()
self.components.append( self.components.append(
{ {
"actions": [],
"type": "Display", "type": "Display",
"model": model, "model": get_inxi(c, "model"),
"manufacturer": manufacturer, "manufacturer": get_inxi(c, "vendor"),
"serialNumber": serial, "serialNumber": get_inxi(c, "serial"),
'size': size, 'size': get_inxi(c, "size"),
'resolutionWidth': resolution_width, 'diagonal': get_inxi(c, "diag"),
'resolutionHeight': resolution_height, 'resolution': get_inxi(c, "res"),
"productionDate": production_date, "date": get_inxi(c, "built"),
'technology': technology, 'ratio': get_inxi(c, "ratio"),
'refreshRate': refresh,
} }
) )
def get_hwinfo_monitors(self):
for c in self.hwinfo:
monitor = None
external = None
for x in c:
if 'Hardware Class: monitor' in x:
monitor = c
if 'Driver Info' in x:
external = c
if monitor and not external:
self.monitors.append(c)
def get_size_monitor(self, x):
i = 1 / 25.4
t = x.split('Size: ')[-1].strip()
tt = t.split('mm')
if not tt:
return 0
sizes = tt[0].strip()
if 'x' not in sizes:
return 0
w, h = [int(x) for x in sizes.split('x')]
return "{:.2f}".format(np.sqrt(w**2 + h**2) * i)
def get_cpu_address(self, cpu):
default = 64
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'processor':
return c.get('width', default)
return default
def get_usb_num(self): def get_usb_num(self):
return len( return len(
[ [
@ -364,133 +387,13 @@ class ParseSnapshot:
] ]
) )
def get_bios_date(self): def get_bios_version(self):
return self.dmi.get("BIOS")[0].get("Release Date", self.default) return self.dmi.get("BIOS")[0].get("BIOS Revision", '1')
def get_firmware(self):
return self.dmi.get("BIOS")[0].get("Firmware Revision", '1')
def get_max_ram_size(self):
size = 0
for slot in self.dmi.get("Physical Memory Array"):
capacity = slot.get("Maximum Capacity", '0').split(" ")[0]
size += int(capacity)
return size
def get_ram_slots(self):
slots = 0
for x in self.dmi.get("Physical Memory Array"):
slots += int(x.get("Number Of Devices", 0))
return slots
def get_ram_size(self, ram):
memory = ram.get("Size", "0")
return memory
def get_ram_speed(self, ram):
size = ram.get("Speed", "0")
return size
def get_cpu_speed(self, cpu):
speed = cpu.get('Max Speed', "0")
return speed
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", self.default).strip()
def get_version(self):
return self.dmi.get("System")[0].get("Version", self.default).strip()
def get_uuid(self):
return self.dmi.get("System")[0].get("UUID", '').strip()
def get_family(self):
return self.dmi.get("System")[0].get("Family", '')
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
def get_type(self):
chassis_type = self.get_chassis()
return self.translation_to_devicehub(chassis_type)
def translation_to_devicehub(self, original_type):
lower_type = original_type.lower()
CHASSIS_TYPE = {
'Desktop': [
'desktop',
'low-profile',
'tower',
'docking',
'all-in-one',
'pizzabox',
'mini-tower',
'space-saving',
'lunchbox',
'mini',
'stick',
],
'Laptop': [
'portable',
'laptop',
'convertible',
'tablet',
'detachable',
'notebook',
'handheld',
'sub-notebook',
],
'Server': ['server'],
'Computer': ['_virtual'],
}
for k, v in CHASSIS_TYPE.items():
if lower_type in v:
return k
return self.default
def get_chassis_dh(self):
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_data_storage_type(self, x):
# TODO @cayop add more SSDS types
SSDS = ["nvme"]
SSD = 'SolidStateDrive'
HDD = 'HardDrive'
type_dev = x.get('device', {}).get('type')
trim = x.get('trim', {}).get("supported") in [True, "true"]
return SSD if type_dev in SSDS or trim else HDD
def get_data_storage_interface(self, x):
interface = x.get('device', {}).get('protocol', 'ATA')
if interface.upper() in DATASTORAGEINTERFACE:
return interface.upper()
txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
self.sid, interface
)
self.errors("{}".format(err))
def get_data_storage_size(self, x):
return x.get('user_capacity', {}).get('bytes')
def parse_hwinfo(self):
hw_blocks = self.hwinfo_raw.split("\n\n")
return [x.split("\n") for x in hw_blocks]
def loads(self, x): def loads(self, x):
if isinstance(x, str): if isinstance(x, str):
try: try:
try: return json.loads(x)
hw = json.loads(x)
except json.decoder.JSONDecodeError:
hw = json.loads(repair_json(x))
return hw
except Exception as ss: except Exception as ss:
logger.warning("%s", ss) logger.warning("%s", ss)
return {} return {}
@ -502,4 +405,3 @@ class ParseSnapshot:
logger.error(txt) logger.error(txt)
self._errors.append("%s", txt) self._errors.append("%s", txt)

View file

@ -137,7 +137,7 @@ class DownloadEvidenceView(DashboardView, TemplateView):
evidence.get_doc() evidence.get_doc()
data = json.dumps(evidence.doc) data = json.dumps(evidence.doc)
response = HttpResponse(data, content_type="application/json") response = HttpResponse(data, content_type="application/json")
response['Content-Disposition'] = 'attachment; filename={}'.format("credential.json") response['Content-Disposition'] = 'attachment; filename={}'.format("evidence.json")
return response return response

View file

@ -22,10 +22,14 @@ def search(institution, qs, offset=0, limit=10):
qp.set_stemming_strategy(xapian.QueryParser.STEM_SOME) qp.set_stemming_strategy(xapian.QueryParser.STEM_SOME)
qp.add_prefix("uuid", "uuid") qp.add_prefix("uuid", "uuid")
query = qp.parse_query(qs) query = qp.parse_query(qs)
institution_term = "U{}".format(institution.id) if institution:
final_query = xapian.Query( institution_term = "U{}".format(institution.id)
xapian.Query.OP_AND, query, xapian.Query(institution_term) final_query = xapian.Query(
) xapian.Query.OP_AND, query, xapian.Query(institution_term)
)
else:
final_query = xapian.Query(query)
enquire = xapian.Enquire(database) enquire = xapian.Enquire(database)
enquire.set_query(final_query) enquire.set_query(final_query)
matches = enquire.get_mset(offset, limit) matches = enquire.get_mset(offset, limit)

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1 @@
{"closed": true, "components": [{"actions": [], "manufacturer": "Intel Corporation", "model": "82579LM Gigabit Network Connection", "serialNumber": "00:11:11:11:11:00", "speed": 1000.0, "type": "NetworkAdapter", "variant": "04", "wireless": false}, {"actions": [], "manufacturer": "Intel Corporation", "model": "7 Series/C216 Chipset Family High Definition Audio Controller", "serialNumber": null, "type": "SoundCard"}, {"actions": [], "format": "DIMM", "interface": "DDR3", "manufacturer": "Micron", "model": "16KTF51264AZ", "serialNumber": "AAAAAAAA", "size": 4096.0, "speed": 1600.0, "type": "RamModule"}, {"actions": [{"endTime": "2022-10-11T13:45:31.239555+00:00", "severity": "Info", "startTime": "2021-10-11T09:45:19.623967+00:00", "steps": [{"endTime": "2021-10-11T11:05:28.090897+00:00", "severity": "Info", "startTime": "2021-10-11T09:45:19.624163+00:00", "type": "StepZero"}, {"endTime": "2021-10-11T13:45:31.239402+00:00", "severity": "Info", "startTime": "2021-10-11T11:05:28.091255+00:00", "type": "StepRandom"}], "type": "EraseSectors"}, {"assessment": true, "commandTimeout": 30, "currentPendingSectorCount": 0, "elapsed": 60, "length": "Short", "lifetime": 18720, "offlineUncorrectable": 0, "powerCycleCount": 2147, "reallocatedSectorCount": 0, "reportedUncorrectableErrors": 0, "severity": "Info", "status": "Completed without error", "type": "TestDataStorage"}, {"elapsed": 11, "readSpeed": 119.0, "type": "BenchmarkDataStorage", "writeSpeed": 32.7}], "interface": "ATA", "manufacturer": "Seagate", "model": "ST3500418AS", "serialNumber": "AAAAAAAA", "size": 500000.0, "type": "HardDrive", "variant": "CC46"}, {"actions": [{"elapsed": 0, "rate": 25540.36, "type": "BenchmarkProcessor"}, {"elapsed": 8, "rate": 7.6939, "type": "BenchmarkProcessorSysbench"}], "address": 64, "brand": "Core i5", "cores": 4, "generation": 3, "manufacturer": "Intel Corp.", "model": "Intel Core i5-3470 CPU @ 3.20GHz", "serialNumber": null, "speed": 1.6242180000000002, "threads": 4, "type": "Processor"}, {"actions": [], "manufacturer": "Intel Corporation", "memory": null, "model": "Xeon E3-1200 v2/3rd Gen Core processor Graphics Controller", "serialNumber": null, "type": "GraphicCard"}, {"actions": [], "biosDate": "2012-08-07T00:00:00", "firewire": 0, "manufacturer": "LENOVO", "model": "MAHOBAY", "pcmcia": 0, "ramMaxSize": 32, "ramSlots": 4, "serial": 1, "serialNumber": null, "slots": 4, "type": "Motherboard", "usb": 3, "version": "9SKT39AUS"}], "device": {"actions": [{"elapsed": 1, "rate": 0.6507, "type": "BenchmarkRamSysbench"}], "chassis": "Tower", "manufacturer": "LENOVO", "model": "3227A2G", "serialNumber": "AAAAAAAA", "sku": "LENOVO_MT_3227", "type": "Desktop", "version": "ThinkCentre M92P"}, "elapsed": 187302510, "endTime": "2016-11-03T17:17:01.116554+00:00", "software": "Workbench", "type": "Snapshot", "uuid": "ae913de1-e639-476a-ad9b-78eabbe4628b", "version": "11.0b11"}

View file

@ -0,0 +1 @@
{"closed": true, "components": [{"actions": [], "manufacturer": "Intel Corporation", "model": "82579LM Gigabit Network Connection", "serialNumber": "00:11:11:11:11:00", "speed": 1000.0, "type": "NetworkAdapter", "variant": "04", "wireless": false}, {"actions": [], "manufacturer": "Intel Corporation", "model": "7 Series/C216 Chipset Family High Definition Audio Controller", "serialNumber": null, "type": "SoundCard"}, {"actions": [], "format": "DIMM", "interface": "DDR3", "manufacturer": "Micron", "model": "16KTF51264AZ", "serialNumber": "AAAAAAAA", "size": 4096.0, "speed": 1600.0, "type": "RamModule"}, {"actions": [{"endTime": "2022-10-11T13:45:31.239555+00:00", "severity": "Info", "startTime": "2021-10-11T09:45:19.623967+00:00", "steps": [{"endTime": "2021-10-11T11:05:28.090897+00:00", "severity": "Info", "startTime": "2021-10-11T09:45:19.624163+00:00", "type": "StepZero"}, {"endTime": "2021-10-11T13:45:31.239402+00:00", "severity": "Info", "startTime": "2021-10-11T11:05:28.091255+00:00", "type": "StepRandom"}], "type": "EraseSectors"}, {"assessment": true, "commandTimeout": 30, "currentPendingSectorCount": 0, "elapsed": 60, "length": "Short", "lifetime": 18720, "offlineUncorrectable": 0, "powerCycleCount": 2147, "reallocatedSectorCount": 0, "reportedUncorrectableErrors": 0, "severity": "Info", "status": "Completed without error", "type": "TestDataStorage"}, {"elapsed": 11, "readSpeed": 119.0, "type": "BenchmarkDataStorage", "writeSpeed": 32.7}], "interface": "ATA", "manufacturer": "Seagate", "model": "ST3500418AS", "serialNumber": "AAAAAAAA", "size": 500000.0, "type": "HardDrive", "variant": "CC46"}, {"actions": [{"elapsed": 0, "rate": 25540.36, "type": "BenchmarkProcessor"}, {"elapsed": 8, "rate": 7.6939, "type": "BenchmarkProcessorSysbench"}], "address": 64, "brand": "Core i5", "cores": 4, "generation": 3, "manufacturer": "Intel Corp.", "model": "Intel Core i5-3470 CPU @ 3.20GHz", "serialNumber": null, "speed": 1.6242180000000002, "threads": 4, "type": "Processor"}, {"actions": [], "manufacturer": "Intel Corporation", "memory": null, "model": "Xeon E3-1200 v2/3rd Gen Core processor Graphics Controller", "serialNumber": null, "type": "GraphicCard"}, {"actions": [], "biosDate": "2012-08-07T00:00:00", "firewire": 0, "manufacturer": "LENOVO", "model": "MAHOBAY", "pcmcia": 0, "ramMaxSize": 32, "ramSlots": 4, "serial": 1, "serialNumber": null, "slots": 4, "type": "Motherboard", "usb": 3, "version": "9SKT39AUS"}], "device": {"actions": [{"elapsed": 1, "rate": 0.6507, "type": "BenchmarkRamSysbench"}], "chassis": "Tower", "manufacturer": "LENOVO", "model": "3227A2G", "serialNumber": "AAAAAAAAD", "sku": "LENOVO_MT_3227", "type": "Desktop", "version": "ThinkCentre M92P"}, "elapsed": 187302510, "endTime": "2016-11-03T17:17:01.116554+00:00", "software": "Workbench", "type": "Snapshot", "uuid": "ae913de1-e639-476a-ad9b-78eabbe4625b", "version": "11.0b11"}

View file

@ -11,3 +11,7 @@ xlrd==2.0.1
odfpy==1.4.1 odfpy==1.4.1
pytz==2024.2 pytz==2024.2
json-repair==0.30.0 json-repair==0.30.0
setuptools==65.5.1
requests==2.32.3
wheel==0.45.1

View file

@ -17,8 +17,19 @@ HID_ALGO1 = [
"sku" "sku"
] ]
LEGACY_DPP = [
"manufacturer",
"model",
"chassis",
"serialNumber",
"sku",
"type",
"version"
]
ALGOS = { ALGOS = {
"hidalgo1": HID_ALGO1, "hidalgo1": HID_ALGO1,
"legacy_dpp": LEGACY_DPP
} }

View file

@ -18,20 +18,28 @@ class CustomFormatter(logging.Formatter):
color = PURPLE color = PURPLE
else: else:
color = RESET color = RESET
record.levelname = f"{color}{record.levelname}{RESET}" record.levelname = f"{color}{record.levelname}{RESET}"
if record.args: if record.args:
record.msg = self.highlight_args(record.msg, record.args, color) try:
record.args = () record.msg = record.msg % record.args
record.args = ()
# provide trace when DEBUG config except (TypeError, ValueError):
if settings.DEBUG: record.msg = f"{color}{record.msg}{RESET}"
import traceback
print(traceback.format_exc()) # Highlight the final formatted message
record.msg = self.highlight_message(record.msg, color)
# pedro says: I discovered that trace is provided anyway with
# this commented (reason: strange None msgs)
# is this needed?
### provide trace when DEBUG config
#if settings.DEBUG:
# import traceback
# print(traceback.format_exc())
return super().format(record) return super().format(record)
def highlight_args(self, message, args, color): def highlight_message(self, message, color):
highlighted_args = tuple(f"{color}{arg}{RESET}" for arg in args) return f"{color}{message}{RESET}"
return message % highlighted_args

View file

@ -19,7 +19,10 @@ def move_json(path_name, user, place="snapshots"):
def save_in_disk(data, user, place="snapshots"): def save_in_disk(data, user, place="snapshots"):
uuid = data.get('uuid', '') uuid = data.get("uuid")
if data.get("credentialSubject"):
uuid = data["credentialSubject"].get("uuid")
now = datetime.now() now = datetime.now()
year = now.year year = now.year
month = now.month month = now.month