Compare commits
88 commits
feature/tr
...
main
Author | SHA1 | Date | |
---|---|---|---|
pedro | 754820f631 | ||
01db6a3e8e | |||
Cayo Puigdefabregas | 9817d4eb45 | ||
Cayo Puigdefabregas | 0d60afaa35 | ||
Cayo Puigdefabregas | 918cf73506 | ||
9d963e7f54 | |||
pedro | 78de86c43a | ||
498da24ee6 | |||
67213e46d2 | |||
5ec44d3378 | |||
1d195806aa | |||
d8202fea0f | |||
e6fea8c1c3 | |||
Cayo Puigdefabregas | 782f6dac51 | ||
Cayo Puigdefabregas | 46bbb940d7 | ||
Cayo Puigdefabregas | abdefe885e | ||
Cayo Puigdefabregas | 1902647337 | ||
Cayo Puigdefabregas | 989120bd0a | ||
Cayo Puigdefabregas | dbdcffe5d9 | ||
pedro | 1ea400051c | ||
f5ea0f8d8d | |||
c5dd3a759d | |||
3bf23c5d3b | |||
Cayo Puigdefabregas | e54d8ba49c | ||
Cayo Puigdefabregas | 7f44d88b61 | ||
Cayo Puigdefabregas | b2e6406fb6 | ||
Cayo Puigdefabregas | b477431fcd | ||
Cayo Puigdefabregas | 2c21977c7c | ||
Cayo Puigdefabregas | 563e394afc | ||
f566bfdf03 | |||
Cayo Puigdefabregas | f04d425a31 | ||
Cayo Puigdefabregas | 8aefc90ece | ||
Cayo Puigdefabregas | 13249d564f | ||
Cayo Puigdefabregas | ddb764b3ea | ||
Cayo Puigdefabregas | 351c62b45d | ||
Cayo Puigdefabregas | 16a3a870be | ||
Cayo Puigdefabregas | 76c4b10fc4 | ||
Cayo Puigdefabregas | 1910609f68 | ||
Cayo Puigdefabregas | 53524643c8 | ||
Cayo Puigdefabregas | 9b7bbd6bcf | ||
Cayo Puigdefabregas | f7f6da5892 | ||
Cayo Puigdefabregas | 8643495a9d | ||
Cayo Puigdefabregas | dd327e5231 | ||
Cayo Puigdefabregas | cea5a279b7 | ||
Cayo Puigdefabregas | 594904905b | ||
74f0d5a507 | |||
Cayo Puigdefabregas | 6d53a2acda | ||
Cayo Puigdefabregas | e64e9b3c06 | ||
Cayo Puigdefabregas | f1c21af654 | ||
Cayo Puigdefabregas | 6efcf5ac18 | ||
Cayo Puigdefabregas | 5631da453a | ||
Cayo Puigdefabregas | 0c5adf87c6 | ||
dde4a114a0 | |||
Cayo Puigdefabregas | e791d0c63c | ||
833c458840 | |||
Cayo Puigdefabregas | e6424af251 | ||
Cayo Puigdefabregas | d31c5c7921 | ||
Cayo Puigdefabregas | 5432d22f35 | ||
Cayo Puigdefabregas | fdc431b43a | ||
ba6c955463 | |||
8eb89e602d | |||
80ab639759 | |||
162fc9a8ef | |||
eec14f4ffb | |||
3ab21b8c32 | |||
316184df47 | |||
7d6119c11a | |||
1248ab6894 | |||
41e4374612 | |||
6e7cb0bf84 | |||
09f6cd2e68 | |||
e25b8e5994 | |||
2793344684 | |||
f70c75a92f | |||
83719edc8e | |||
69bc95c0cd | |||
9863a59911 | |||
eb78d38c3a | |||
a49b31dd85 | |||
Cayo Puigdefabregas | 493c7636b2 | ||
Cayo Puigdefabregas | 88e036eb3c | ||
Cayo Puigdefabregas | b759c53e75 | ||
Cayo Puigdefabregas | 7ab88ad290 | ||
Cayo Puigdefabregas | f6d1cf719c | ||
fb836edfbb | |||
Cayo Puigdefabregas | cc350775ed | ||
Cayo Puigdefabregas | 0d574cae63 | ||
Cayo Puigdefabregas | 9857891b63 |
|
@ -1,8 +1,9 @@
|
|||
DOMAIN=localhost
|
||||
DH_DOMAIN=localhost
|
||||
DH_PORT=8000
|
||||
DEMO=true
|
||||
# note that with DEBUG=true, logs are more verbose (include tracebacks)
|
||||
DEBUG=true
|
||||
ALLOWED_HOSTS=localhost,localhost:8000,127.0.0.1,
|
||||
DPP=false
|
||||
|
||||
STATIC_ROOT=/tmp/static/
|
||||
MEDIA_ROOT=/tmp/media/
|
||||
|
@ -15,6 +16,7 @@ EMAIL_BACKEND="django.core.mail.backends.smtp.EmailBackend"
|
|||
EMAIL_FILE_PATH="/tmp/app-messages"
|
||||
ENABLE_EMAIL=false
|
||||
PREDEFINED_TOKEN='5018dd65-9abd-4a62-8896-80f34ac66150'
|
||||
DH_ALLOWED_HOSTS=${DH_DOMAIN},${DH_DOMAIN}:${DH_PORT},127.0.0.1,127.0.0.1:${DH_PORT}
|
||||
# TODO review these vars
|
||||
#SNAPSHOTS_DIR=/path/to/TODO
|
||||
#EVIDENCES_DIR=/path/to/TODO
|
||||
|
|
16
api/views.py
16
api/views.py
|
@ -85,17 +85,21 @@ class NewSnapshotView(ApiMixing):
|
|||
# except Exception:
|
||||
# return JsonResponse({'error': 'Invalid Snapshot'}, status=400)
|
||||
|
||||
if not data.get("uuid"):
|
||||
ev_uuid = data.get("uuid")
|
||||
if data.get("credentialSubject"):
|
||||
ev_uuid = data["credentialSubject"].get("uuid")
|
||||
|
||||
if not ev_uuid:
|
||||
txt = "error: the snapshot not have uuid"
|
||||
logger.error("%s", txt)
|
||||
return JsonResponse({'status': txt}, status=500)
|
||||
|
||||
exist_annotation = Annotation.objects.filter(
|
||||
uuid=data['uuid']
|
||||
uuid=ev_uuid
|
||||
).first()
|
||||
|
||||
if exist_annotation:
|
||||
txt = "error: the snapshot {} exist".format(data['uuid'])
|
||||
txt = "error: the snapshot {} exist".format(ev_uuid)
|
||||
logger.warning("%s", txt)
|
||||
return JsonResponse({'status': txt}, status=500)
|
||||
|
||||
|
@ -105,14 +109,14 @@ class NewSnapshotView(ApiMixing):
|
|||
except Exception as err:
|
||||
if settings.DEBUG:
|
||||
logger.exception("%s", err)
|
||||
snapshot_id = data.get("uuid", "")
|
||||
snapshot_id = ev_uuid
|
||||
txt = "It is not possible to parse snapshot: %s."
|
||||
logger.error(txt, snapshot_id)
|
||||
text = "fail: It is not possible to parse snapshot"
|
||||
return JsonResponse({'status': text}, status=500)
|
||||
|
||||
annotation = Annotation.objects.filter(
|
||||
uuid=data['uuid'],
|
||||
uuid=ev_uuid,
|
||||
type=Annotation.Type.SYSTEM,
|
||||
# TODO this is hardcoded, it should select the user preferred algorithm
|
||||
key="hidalgo1",
|
||||
|
@ -121,7 +125,7 @@ class NewSnapshotView(ApiMixing):
|
|||
|
||||
|
||||
if not annotation:
|
||||
logger.error("Error: No annotation for uuid: %s", data["uuid"])
|
||||
logger.error("Error: No annotation for uuid: %s", ev_uuid)
|
||||
return JsonResponse({'status': 'fail'}, status=500)
|
||||
|
||||
url_args = reverse_lazy("device:details", args=(annotation.value,))
|
||||
|
|
|
@ -69,7 +69,11 @@
|
|||
{{ dev.manufacturer }}
|
||||
</td>
|
||||
<td>
|
||||
{% if dev.version %}
|
||||
{{dev.version}} {{ dev.model }}
|
||||
{% else %}
|
||||
{{ dev.model }}
|
||||
{% endif %}
|
||||
</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
|
|
|
@ -84,8 +84,11 @@ class SearchView(InventaryMixin):
|
|||
return devices, count
|
||||
|
||||
def get_annotations(self, xp):
|
||||
snap = xp.document.get_data()
|
||||
uuid = json.loads(snap).get('uuid')
|
||||
snap = json.loads(xp.document.get_data())
|
||||
if snap.get("credentialSubject"):
|
||||
uuid = snap["credentialSubject"]["uuid"]
|
||||
else:
|
||||
uuid = snap["uuid"]
|
||||
return Device.get_annotation_from_uuid(uuid, self.request.user.institution)
|
||||
|
||||
def search_hids(self, query, offset, limit):
|
||||
|
|
|
@ -25,6 +25,7 @@ class Device:
|
|||
def __init__(self, *args, **kwargs):
|
||||
# the id is the chid of the device
|
||||
self.id = kwargs["id"]
|
||||
self.uuid = kwargs.get("uuid")
|
||||
self.pk = self.id
|
||||
self.shortid = self.pk[:6].upper()
|
||||
self.algorithm = None
|
||||
|
@ -103,11 +104,19 @@ class Device:
|
|||
self.evidences = [Evidence(u) for u in self.uuids]
|
||||
|
||||
def get_last_evidence(self):
|
||||
if self.last_evidence:
|
||||
return
|
||||
|
||||
if self.uuid:
|
||||
self.last_evidence = Evidence(self.uuid)
|
||||
return
|
||||
|
||||
annotations = self.get_annotations()
|
||||
if not annotations.count():
|
||||
return
|
||||
annotation = annotations.first()
|
||||
self.last_evidence = Evidence(annotation.uuid)
|
||||
self.uuid = annotation.uuid
|
||||
|
||||
def is_eraseserver(self):
|
||||
if not self.uuids:
|
||||
|
@ -126,6 +135,8 @@ class Device:
|
|||
return False
|
||||
|
||||
def last_uuid(self):
|
||||
if self.uuid:
|
||||
return self.uuid
|
||||
return self.uuids[0]
|
||||
|
||||
def get_lots(self):
|
||||
|
@ -263,45 +274,44 @@ class Device:
|
|||
|
||||
@property
|
||||
def is_websnapshot(self):
|
||||
if not self.last_evidence:
|
||||
self.get_last_evidence()
|
||||
self.get_last_evidence()
|
||||
return self.last_evidence.doc['type'] == "WebSnapshot"
|
||||
|
||||
@property
|
||||
def last_user_evidence(self):
|
||||
if not self.last_evidence:
|
||||
self.get_last_evidence()
|
||||
self.get_last_evidence()
|
||||
return self.last_evidence.doc['kv'].items()
|
||||
|
||||
@property
|
||||
def manufacturer(self):
|
||||
if not self.last_evidence:
|
||||
self.get_last_evidence()
|
||||
self.get_last_evidence()
|
||||
return self.last_evidence.get_manufacturer()
|
||||
|
||||
@property
|
||||
def serial_number(self):
|
||||
if not self.last_evidence:
|
||||
self.get_last_evidence()
|
||||
self.get_last_evidence()
|
||||
return self.last_evidence.get_serial_number()
|
||||
|
||||
@property
|
||||
def type(self):
|
||||
self.get_last_evidence()
|
||||
if self.last_evidence.doc['type'] == "WebSnapshot":
|
||||
return self.last_evidence.doc.get("device", {}).get("type", "")
|
||||
|
||||
if not self.last_evidence:
|
||||
self.get_last_evidence()
|
||||
return self.last_evidence.get_chassis()
|
||||
|
||||
@property
|
||||
def model(self):
|
||||
if not self.last_evidence:
|
||||
self.get_last_evidence()
|
||||
self.get_last_evidence()
|
||||
return self.last_evidence.get_model()
|
||||
|
||||
@property
|
||||
def components(self):
|
||||
def version(self):
|
||||
if not self.last_evidence:
|
||||
self.get_last_evidence()
|
||||
return self.last_evidence.get_version()
|
||||
|
||||
@property
|
||||
def components(self):
|
||||
self.get_last_evidence()
|
||||
return self.last_evidence.get_components()
|
||||
|
|
|
@ -29,6 +29,11 @@
|
|||
<li class="nav-item">
|
||||
<a href="#evidences" class="nav-link" data-bs-toggle="tab" data-bs-target="#evidences">{% trans 'Evidences' %}</a>
|
||||
</li>
|
||||
{% if dpps %}
|
||||
<li class="nav-item">
|
||||
<a href="#dpps" class="nav-link" data-bs-toggle="tab" data-bs-target="#dpps">{% trans 'Dpps' %}</a>
|
||||
</li>
|
||||
{% endif %}
|
||||
<li class="nav-item">
|
||||
<a class="nav-link" href="{% url 'device:device_web' object.id %}" target="_blank">Web</a>
|
||||
</li>
|
||||
|
@ -53,34 +58,41 @@
|
|||
</div>
|
||||
{% endif %}
|
||||
|
||||
<div class="row mb-3">
|
||||
<div class="row mb-1">
|
||||
<div class="col-lg-3 col-md-4 label">Type</div>
|
||||
<div class="col-lg-9 col-md-8">{{ object.type }}</div>
|
||||
</div>
|
||||
|
||||
{% if object.is_websnapshot and object.last_user_evidence %}
|
||||
{% for k, v in object.last_user_evidence %}
|
||||
<div class="row mb-3">
|
||||
<div class="row mb-1">
|
||||
<div class="col-lg-3 col-md-4 label">{{ k }}</div>
|
||||
<div class="col-lg-9 col-md-8">{{ v|default:'' }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
<div class="row mb-3">
|
||||
<div class="row mb-1">
|
||||
<div class="col-lg-3 col-md-4 label">
|
||||
{% trans 'Manufacturer' %}
|
||||
</div>
|
||||
<div class="col-lg-9 col-md-8">{{ object.manufacturer|default:'' }}</div>
|
||||
</div>
|
||||
|
||||
<div class="row mb-3">
|
||||
<div class="row mb-1">
|
||||
<div class="col-lg-3 col-md-4 label">
|
||||
{% trans 'Model' %}
|
||||
</div>
|
||||
<div class="col-lg-9 col-md-8">{{ object.model|default:'' }}</div>
|
||||
</div>
|
||||
|
||||
<div class="row mb-3">
|
||||
<div class="row mb-1">
|
||||
<div class="col-lg-3 col-md-4 label">
|
||||
{% trans 'Version' %}
|
||||
</div>
|
||||
<div class="col-lg-9 col-md-8">{{ object.version|default:'' }}</div>
|
||||
</div>
|
||||
|
||||
<div class="row mb-1">
|
||||
<div class="col-lg-3 col-md-4 label">
|
||||
{% trans 'Serial Number' %}
|
||||
</div>
|
||||
|
@ -229,6 +241,25 @@
|
|||
{% endfor %}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{% if dpps %}
|
||||
<div class="tab-pane fade" id="dpps">
|
||||
<h5 class="card-title">{% trans 'List of dpps' %}</h5>
|
||||
<div class="list-group col">
|
||||
{% for d in dpps %}
|
||||
<div class="list-group-item">
|
||||
<div class="d-flex w-100 justify-content-between">
|
||||
<small class="text-muted">{{ d.timestamp }}</small>
|
||||
<span>{{ d.type }}</span>
|
||||
</div>
|
||||
<p class="mb-1">
|
||||
<a href="{% url 'did:device_web' d.signature %}">{{ d.signature }}</a>
|
||||
</p>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endblock %}
|
||||
|
||||
|
@ -237,12 +268,12 @@
|
|||
document.addEventListener('DOMContentLoaded', function () {
|
||||
// Obtener el hash de la URL (ejemplo: #components)
|
||||
const hash = window.location.hash
|
||||
|
||||
|
||||
// Verificar si hay un hash en la URL
|
||||
if (hash) {
|
||||
// Buscar el botón o enlace que corresponde al hash y activarlo
|
||||
const tabTrigger = document.querySelector(`[data-bs-target="${hash}"]`)
|
||||
|
||||
|
||||
if (tabTrigger) {
|
||||
// Crear una instancia de tab de Bootstrap para activar el tab
|
||||
const tab = new bootstrap.Tab(tabTrigger)
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
import json
|
||||
from django.http import JsonResponse
|
||||
|
||||
from django.http import Http404
|
||||
from django.conf import settings
|
||||
from django.urls import reverse_lazy
|
||||
from django.shortcuts import get_object_or_404, Http404
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
@ -16,6 +14,9 @@ from evidence.models import Annotation
|
|||
from lot.models import LotTag
|
||||
from device.models import Device
|
||||
from device.forms import DeviceFormSet
|
||||
if settings.DPP:
|
||||
from dpp.models import Proof
|
||||
from dpp.api_dlt import PROOF_TYPE
|
||||
|
||||
|
||||
class NewDeviceView(DashboardView, FormView):
|
||||
|
@ -103,10 +104,17 @@ class DetailsView(DashboardView, TemplateView):
|
|||
context = super().get_context_data(**kwargs)
|
||||
self.object.initial()
|
||||
lot_tags = LotTag.objects.filter(owner=self.request.user.institution)
|
||||
dpps = []
|
||||
if settings.DPP:
|
||||
dpps = Proof.objects.filter(
|
||||
uuid__in=self.object.uuids,
|
||||
type=PROOF_TYPE["IssueDPP"]
|
||||
)
|
||||
context.update({
|
||||
'object': self.object,
|
||||
'snapshot': self.object.get_last_evidence(),
|
||||
'lot_tags': lot_tags,
|
||||
'dpps': dpps,
|
||||
})
|
||||
return context
|
||||
|
||||
|
|
|
@ -91,6 +91,11 @@ INSTALLED_APPS = [
|
|||
"api",
|
||||
]
|
||||
|
||||
DPP = config("DPP", default=False, cast=bool)
|
||||
|
||||
if DPP:
|
||||
INSTALLED_APPS.extend(["dpp", "did"])
|
||||
|
||||
|
||||
MIDDLEWARE = [
|
||||
"django.middleware.security.SecurityMiddleware",
|
||||
|
@ -239,3 +244,9 @@ LOGGING = {
|
|||
SNAPSHOT_PATH="/tmp/"
|
||||
DATA_UPLOAD_MAX_NUMBER_FILES = 1000
|
||||
COMMIT = config('COMMIT', default='')
|
||||
|
||||
# DLT SETTINGS
|
||||
TOKEN_DLT = config("API_DLT_TOKEN", default=None)
|
||||
API_DLT = config("API_DLT", default=None)
|
||||
API_RESOLVER = config("API_RESOLVER", default=None)
|
||||
ID_FEDERATED = config("ID_FEDERATED", default=None)
|
||||
|
|
|
@ -14,7 +14,7 @@ Including another URLconf
|
|||
1. Import the include() function: from django.urls import include, path
|
||||
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
|
||||
"""
|
||||
|
||||
from django.conf import settings
|
||||
from django.urls import path, include
|
||||
|
||||
urlpatterns = [
|
||||
|
@ -28,3 +28,9 @@ urlpatterns = [
|
|||
path("lot/", include("lot.urls")),
|
||||
path('api/', include('api.urls')),
|
||||
]
|
||||
|
||||
if settings.DPP:
|
||||
urlpatterns.extend([
|
||||
path('dpp/', include('dpp.urls')),
|
||||
path('did/', include('did.urls')),
|
||||
])
|
||||
|
|
0
did/__init__.py
Normal file
0
did/__init__.py
Normal file
3
did/admin.py
Normal file
3
did/admin.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from django.contrib import admin
|
||||
|
||||
# Register your models here.
|
6
did/apps.py
Normal file
6
did/apps.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class DidConfig(AppConfig):
|
||||
default_auto_field = "django.db.models.BigAutoField"
|
||||
name = "did"
|
0
did/migrations/__init__.py
Normal file
0
did/migrations/__init__.py
Normal file
3
did/models.py
Normal file
3
did/models.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from django.db import models
|
||||
|
||||
# Create your models here.
|
41
did/template_credential.py
Normal file
41
did/template_credential.py
Normal file
|
@ -0,0 +1,41 @@
|
|||
dpp_tmpl = {
|
||||
"@context": [
|
||||
"https://www.w3.org/ns/credentials/v2",
|
||||
"https://test.uncefact.org/vocabulary/untp/dpp/0.5.0/"
|
||||
],
|
||||
"type": [
|
||||
"DigitalProductPassport",
|
||||
"VerifiableCredential"
|
||||
],
|
||||
"id": "https://example.ereuse.org/credentials/2a423366-a0d6-4855-ba65-2e0c926d09b0",
|
||||
"issuer": {
|
||||
"type": [
|
||||
"CredentialIssuer"
|
||||
],
|
||||
"id": "did:web:r1.identifiers.ereuse.org:did-registry:z6Mkoreij5y9bD9fL5SGW6TfMUmcbaV7LCPwZHCFEEZBrVYQ#z6Mkoreij5y9bD9fL5SGW6TfMUmcbaV7LCPwZHCFEEZBrVYQ",
|
||||
"name": "Refurbisher One"
|
||||
},
|
||||
"validFrom": "2024-11-15T12:00:00",
|
||||
"validUntil": "2034-11-15T12:00:00",
|
||||
"credentialSubject": {
|
||||
"type": [
|
||||
"Product"
|
||||
],
|
||||
"id": "https://id.ereuse.org/01/09520123456788/21/12345",
|
||||
"name": "Refurbished XYZ Lenovo laptop item",
|
||||
"registeredId": "09520123456788.21.12345",
|
||||
"description": "XYZ Lenovo laptop refurbished by Refurbisher One",
|
||||
"data": ""
|
||||
},
|
||||
"credentialSchema": {
|
||||
"id": "https://idhub.pangea.org/vc_schemas/dpp.json",
|
||||
"type": "FullJsonSchemaValidator2021",
|
||||
"proof": {
|
||||
"type": "Ed25519Signature2018",
|
||||
"proofPurpose": "assertionMethod",
|
||||
"verificationMethod": "did:web:r1.identifiers.ereuse.org:did-registry:z6Mkoreij5y9bD9fL5SGW6TfMUmcbaV7LCPwZHCFEEZBrVYQ#z6Mkoreij5y9bD9fL5SGW6TfMUmcbaV7LCPwZHCFEEZBrVYQ",
|
||||
"created": "2024-12-03T15:33:42Z",
|
||||
"jws": "eyJhbGciOiJFZERTQSIsImNyaXQiOlsiYjY0Il0sImI2NCI6ZmFsc2V9..rBPqbOcZCXB7GAnq1XIfV9Jvw4MKXlHff7qZkRfgwQ0Hnd9Ujt5s1xT4O0K6VESzWvdP2mOvMvu780fVNfraBQ"
|
||||
}
|
||||
}
|
||||
}
|
497
did/templates/device_did.html
Normal file
497
did/templates/device_did.html
Normal file
|
@ -0,0 +1,497 @@
|
|||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8" />
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
|
||||
<title>{{ object.type }}</title>
|
||||
<link href="https://cdnjs.cloudflare.com/ajax/libs/bootstrap/5.1.3/css/bootstrap.min.css" rel="stylesheet" />
|
||||
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap-icons@1.11.1/font/bootstrap-icons.css" />
|
||||
<style>
|
||||
body {
|
||||
font-size: 0.875rem;
|
||||
background-color: #f8f9fa;
|
||||
display: flex;
|
||||
flex-direction: column;
|
||||
min-height: 100vh;
|
||||
}
|
||||
.custom-container {
|
||||
background-color: #ffffff;
|
||||
border-radius: 10px;
|
||||
box-shadow: 0 0 20px rgba(0, 0, 0, 0.1);
|
||||
padding: 30px;
|
||||
margin-top: 30px;
|
||||
flex-grow: 1;
|
||||
}
|
||||
.section-title {
|
||||
color: #7a9f4f;
|
||||
border-bottom: 2px solid #9cc666;
|
||||
padding-bottom: 10px;
|
||||
margin-bottom: 20px;
|
||||
font-size: 1.5em;
|
||||
}
|
||||
.info-row {
|
||||
margin-bottom: 10px;
|
||||
}
|
||||
.info-label {
|
||||
font-weight: bold;
|
||||
color: #545f71;
|
||||
}
|
||||
.info-value {
|
||||
color: #333;
|
||||
}
|
||||
.component-card {
|
||||
background-color: #f8f9fa;
|
||||
border-left: 4px solid #9cc666;
|
||||
margin-bottom: 15px;
|
||||
transition: all 0.3s ease;
|
||||
}
|
||||
.component-card:hover {
|
||||
box-shadow: 0 5px 15px rgba(0, 0, 0, 0.1);
|
||||
transform: translateY(-2px);
|
||||
}
|
||||
.hash-value {
|
||||
word-break: break-all;
|
||||
background-color: #f3f3f3;
|
||||
padding: 5px;
|
||||
border-radius: 4px;
|
||||
font-family: monospace;
|
||||
font-size: 0.9em;
|
||||
border: 1px solid #e0e0e0;
|
||||
}
|
||||
.card-title {
|
||||
color: #9cc666;
|
||||
}
|
||||
.btn-primary {
|
||||
background-color: #9cc666;
|
||||
border-color: #9cc666;
|
||||
padding: 0.1em 2em;
|
||||
font-weight: 700;
|
||||
}
|
||||
.btn-primary:hover {
|
||||
background-color: #8ab555;
|
||||
border-color: #8ab555;
|
||||
}
|
||||
.btn-green-user {
|
||||
background-color: #c7e3a3;
|
||||
}
|
||||
.btn-grey {
|
||||
background-color: #f3f3f3;
|
||||
}
|
||||
footer {
|
||||
background-color: #545f71;
|
||||
color: #ffffff;
|
||||
text-align: center;
|
||||
padding: 10px 0;
|
||||
margin-top: 20px;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container custom-container">
|
||||
<nav class="header-nav ms-auto">
|
||||
<div class="d-flex align-items-right">
|
||||
<span class="nav-item">
|
||||
{% if not roles and user.is_anonymous %}
|
||||
<button class="btn btn-primary" data-bs-toggle="modal" data-bs-target="#validateModal">Validate</button>
|
||||
{% else %}
|
||||
<button class="btn btn-primary" id="buttonRole" data-bs-toggle="modal" data-bs-target="#rolesModal">Select your role</button>
|
||||
<a class="btn btn-primary" href="{% url 'login:logout' %}?next={{ path }}">Logout</a>
|
||||
{% endif %}
|
||||
</span>
|
||||
</div>
|
||||
{% if role %}
|
||||
<div class="d-flex justify-content-end">
|
||||
<span class="nav-item">
|
||||
Current Role: {{ role }}
|
||||
</span>
|
||||
</div>
|
||||
{% endif %}
|
||||
</nav>
|
||||
|
||||
<h1 class="text-center mb-4" style="color: #545f71;">{{ object.manufacturer }} {{ object.type }} {{ object.model }}</h1>
|
||||
|
||||
<div class="row">
|
||||
<div class="col-lg-6">
|
||||
{% if manuals.details.logo %}
|
||||
<img style="max-width: 50px; margin-right: 15px;" src="{{ manuals.details.logo }}" />
|
||||
{% endif %}
|
||||
</div>
|
||||
<div class="col-lg-6">
|
||||
{% if manuals.details.image %}
|
||||
<img style="width: 100px;" src="{{ manuals.details.image }}" />
|
||||
{% endif %}
|
||||
</div>
|
||||
</div>
|
||||
<div class="row">
|
||||
<div class="col-lg-6">
|
||||
<h2 class="section-title">Details</h2>
|
||||
<div class="info-row row">
|
||||
<div class="col-md-4 info-label">Phid</div>
|
||||
<div class="col-md-8 info-value">
|
||||
<div class="hash-value">{{ object.id }}</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="info-row row">
|
||||
<div class="col-md-4 info-label">Type</div>
|
||||
<div class="col-md-8 info-value">{{ object.type }}</div>
|
||||
</div>
|
||||
|
||||
{% if object.is_websnapshot %}
|
||||
{% for snapshot_key, snapshot_value in object.last_user_evidence %}
|
||||
<div class="info-row row">
|
||||
<div class="col-md-4 info-label">{{ snapshot_key }}</div>
|
||||
<div class="col-md-8 info-value">{{ snapshot_value|default:'' }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
<div class="info-row row">
|
||||
<div class="col-md-4 info-label">Manufacturer</div>
|
||||
<div class="col-md-8 info-value">{{ object.manufacturer|default:'' }}</div>
|
||||
</div>
|
||||
<div class="info-row row">
|
||||
<div class="col-md-4 info-label">Model</div>
|
||||
<div class="col-md-8 info-value">{{ object.model|default:'' }}</div>
|
||||
</div>
|
||||
{% if user.is_authenticated %}
|
||||
<div class="info-row row">
|
||||
<div class="col-md-4 info-label">Serial Number</div>
|
||||
<div class="col-md-8 info-value">{{ object.serial_number|default:'' }}</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
</div>
|
||||
|
||||
<div class="col-lg-6">
|
||||
<h2 class="section-title">Identifiers</h2>
|
||||
{% for chid in object.hids %}
|
||||
<div class="info-row">
|
||||
<div class="hash-value">{{ chid|default:'' }}</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
</div>
|
||||
<h2 class="section-title mt-5">Components</h2>
|
||||
<div class="row">
|
||||
{% for component in object.components %}
|
||||
<div class="col-md-6 mb-3">
|
||||
<div class="card component-card">
|
||||
<div class="card-body">
|
||||
<h5 class="card-title">{{ component.type }}</h5>
|
||||
<p class="card-text">
|
||||
{% for component_key, component_value in component.items %}
|
||||
{% if component_key not in 'actions,type' %}
|
||||
{% if component_key != 'serialNumber' or user.is_authenticated %}
|
||||
<strong>{{ component_key }}:</strong> {{ component_value }}<br />
|
||||
{% endif %}
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
{% if manuals.icecat %}
|
||||
<h5 class="card-title">Icecat data sheet</h5>
|
||||
<div class="row">
|
||||
<div class="col-12 list-group-item d-flex align-items-center">
|
||||
{% if manuals.details.logo %}
|
||||
<img style="max-width: 50px; margin-right: 15px;" src="{{ manuals.details.logo }}" />
|
||||
{% endif %}
|
||||
{% if manuals.details.image %}
|
||||
<img style="max-width: 100px; margin-right: 15px;" src="{{ manuals.details.image }}" />
|
||||
{% endif %}
|
||||
{% if manuals.details.pdf %}
|
||||
<a href="{{ manuals.details.pdf }}" target="_blank">{{ manuals.details.title }}</a><br />
|
||||
{% else %}
|
||||
{{ manuals.details.title }}<br />
|
||||
{% endif %}
|
||||
</div>
|
||||
<div class="col-12 accordion-item">
|
||||
<h5 class="card-title accordion-header">
|
||||
<button class="accordion-button collapsed" data-bs-target="#manuals-icecat" type="button"
|
||||
data-bs-toggle="collapse" aria-expanded="false">
|
||||
More examples
|
||||
</button>
|
||||
</h5>
|
||||
<div id="manuals-icecat" class="row accordion-collapse collapse">
|
||||
<div class="accordion-body">
|
||||
{% for m in manuals.icecat %}
|
||||
<div class="list-group-item d-flex align-items-center">
|
||||
{% if m.logo %}
|
||||
<img style="max-width: 50px; margin-right: 15px;" src="{{ m.logo }}" />
|
||||
{% endif %}
|
||||
{% if m.pdf %}
|
||||
<a href="{{ m.pdf }}" target="_blank">{{ m.title }}</a><br />
|
||||
{% else %}
|
||||
{{ m.title }}<br />
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
{% if manuals.laer %}
|
||||
<div class="row mt-3">
|
||||
<div class="col-12">
|
||||
<h5 class="card-title">Recycled Content</h5>
|
||||
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-2">
|
||||
Metal
|
||||
</div>
|
||||
<div class="col-sm-10">
|
||||
<div class="progress">
|
||||
|
||||
<div class="progress-bar"
|
||||
role="progressbar"
|
||||
style="width: {{ manuals.laer.0.metal }}%"
|
||||
aria-valuenow="{{ manuals.laer.0.metal }}"
|
||||
aria-valuemin="0"
|
||||
aria-valuemax="100">{{ manuals.laer.0.metal }}%
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-2">
|
||||
Plastic post Consumer
|
||||
</div>
|
||||
<div class="col-sm-10">
|
||||
<div class="progress">
|
||||
<div class="progress-bar"
|
||||
role="progressbar"
|
||||
style="width: {{ manuals.laer.0.plastic_post_consumer }}%"
|
||||
aria-valuenow="{{ manuals.laer.0.plastic_post_consumer }}"
|
||||
aria-valuemin="0"
|
||||
aria-valuemax="100">{{ manuals.laer.0.plastic_post_consumer }}%
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-2">
|
||||
Plastic post Industry
|
||||
</div>
|
||||
<div class="col-sm-10">
|
||||
<div class="progress">
|
||||
<div class="progress-bar"
|
||||
role="progressbar"
|
||||
style="width: {{ manuals.laer.0.plastic_post_industry }}%"
|
||||
aria-valuenow="{{ manuals.laer.0.plastic_post_industry }}"
|
||||
aria-valuemin="0"
|
||||
aria-valuemax="100">{{ manuals.laer.0.plastic_post_industry }}%
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if manuals.energystar %}
|
||||
<div class="row mt-3">
|
||||
<div class="col-12">
|
||||
<h5 class="card-title">Energy spent</h5>
|
||||
|
||||
{% if manuals.energystar.long_idle_watts %}
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-10">
|
||||
Consumption when inactivity power function is activated (watts)
|
||||
</div>
|
||||
<div class="col-sm-2">
|
||||
{{ manuals.energystar.long_idle_watts }}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if manuals.energystar.short_idle_watts %}
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-10">
|
||||
Consumption when inactivity power function is not activated (watts)
|
||||
</div>
|
||||
<div class="col-sm-2">
|
||||
{{ manuals.energystar.short_idle_watts }}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if manuals.energystar.sleep_mode_watts %}
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-10">
|
||||
sleep_mode_watts
|
||||
Consumption when computer goes into sleep mode (watts)
|
||||
</div>
|
||||
<div class="col-sm-2">
|
||||
{{ manuals.energystar.sleep_mode_watts }}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if manuals.energystar.off_mode_watts %}
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-10">
|
||||
Consumption when the computer is off (watts)
|
||||
</div>
|
||||
<div class="col-sm-2">
|
||||
{{ manuals.energystar.off_mode_watts }}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if manuals.energystar.tec_allowance_kwh %}
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-10">
|
||||
Power allocation for normal operation (kwh)
|
||||
</div>
|
||||
<div class="col-sm-2">
|
||||
{{ manuals.energystar.tec_allowance_kwh }}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if manuals.energystar.tec_of_model_kwh %}
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-10">
|
||||
Consumption of the model configuration (kwh)
|
||||
</div>
|
||||
<div class="col-sm-2">
|
||||
{{ manuals.energystar.tec_of_model_kwh }}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if manuals.energystar.tec_requirement_kwh %}
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-10">
|
||||
Energy allowance provided (kwh)
|
||||
</div>
|
||||
<div class="col-sm-2">
|
||||
{{ manuals.energystar.tec_requirement_kwh }}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if manuals.energystar.work_off_mode_watts %}
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-10">
|
||||
The lowest power mode which cannot be switched off (watts)
|
||||
</div>
|
||||
<div class="col-sm-2">
|
||||
{{ manuals.energystar.work_off_mode_watts }}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
{% if manuals.energystar.work_weighted_power_of_model_watts %}
|
||||
<div class="row mb-3">
|
||||
<div class="col-sm-10">
|
||||
Weighted energy consumption from all its states (watts)
|
||||
</div>
|
||||
<div class="col-sm-2">
|
||||
{{ manuals.energystar.work_weighted_power_of_model_watts }}
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
|
||||
{% if manuals.ifixit %}
|
||||
<div class="row">
|
||||
<div class="col-12 accordion-item">
|
||||
<h5 class="card-title accordion-header">
|
||||
<button class="accordion-button collapsed" data-bs-target="#manuals-repair" type="button"
|
||||
data-bs-toggle="collapse" aria-expanded="false">
|
||||
Repair manuals
|
||||
</button>
|
||||
</h5>
|
||||
<div id="manuals-repair" class="row accordion-collapse collapse">
|
||||
<div class="list-group col">
|
||||
{% for m in manuals.ifixit %}
|
||||
<div class="list-group-item d-flex align-items-center">
|
||||
{% if m.image %}
|
||||
<img style="max-width: 100px; margin-right: 15px;" src="{{ m.image }}" />
|
||||
{% endif %}
|
||||
{% if m.url %}
|
||||
<a href="{{ m.url }}" target="_blank">{{ m.title }}</a><br />
|
||||
{% else %}
|
||||
{{ m.title }}<br />
|
||||
{% endif %}
|
||||
</div>
|
||||
{% endfor %}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
<footer>
|
||||
<p>
|
||||
©{% now 'Y' %}eReuse. All rights reserved.
|
||||
</p>
|
||||
</footer>
|
||||
{% if user.is_anonymous and not roles %}
|
||||
<div class="modal fade" id="validateModal" tabindex="-1" style="display: none;" aria-hidden="true">
|
||||
<div class="modal-dialog modal-dialog-centered">
|
||||
<div class="modal-content">
|
||||
|
||||
<div class="modal-header">
|
||||
<h5 class="modal-title">Validate as <span id="title-action"></span></h5>
|
||||
<button type="button" class="btn-close" data-bs-dismiss="modal" aria-label="Close"></button>
|
||||
</div>
|
||||
|
||||
<div class="modal-body">
|
||||
<a class="btn btn-primary" type="button"
|
||||
href="{% url 'login:login' %}?next={{ path }}">
|
||||
User of system
|
||||
</a>
|
||||
{% if oidc %}
|
||||
<br />
|
||||
<a class="btn btn-primary mt-3" type="button" href="{# url 'oidc:login_other_inventory' #}?next={{ path }}">
|
||||
User of other inventory
|
||||
</a>
|
||||
{% endif %}
|
||||
</div>
|
||||
|
||||
<div class="modal-footer"></div>
|
||||
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{% else %}
|
||||
<div class="modal fade" id="rolesModal" tabindex="-1" style="display: none;" aria-hidden="true">
|
||||
<div class="modal-dialog modal-dialog-centered">
|
||||
<div class="modal-content">
|
||||
|
||||
<form action="{{ path }}" method="get">
|
||||
<div class="modal-header">
|
||||
<h5 class="modal-title">Select your Role <span id="title-action"></span></h5>
|
||||
<button type="button" class="btn-close" data-bs-dismiss="modal" aria-label="Close"></button>
|
||||
</div>
|
||||
|
||||
<div class="modal-body">
|
||||
<select name="role">
|
||||
{% for k, v in roles %}
|
||||
<option value="{{ k }}" {% if v == role %}selected=selected{% endif %}>{{ v }}</option>
|
||||
{% endfor %}
|
||||
</select>
|
||||
</div>
|
||||
|
||||
<div class="modal-footer">
|
||||
<button type="button" class="btn btn-secondary" data-bs-dismiss="modal">Close</button>
|
||||
<input type="submit" class="btn btn-primary" value="Send" />
|
||||
</div>
|
||||
</form>
|
||||
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<script src="https://cdnjs.cloudflare.com/ajax/libs/bootstrap/5.1.3/js/bootstrap.bundle.min.js"></script>
|
||||
</body>
|
||||
</html>
|
3
did/tests.py
Normal file
3
did/tests.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from django.test import TestCase
|
||||
|
||||
# Create your tests here.
|
8
did/urls.py
Normal file
8
did/urls.py
Normal file
|
@ -0,0 +1,8 @@
|
|||
from django.urls import path
|
||||
from did import views
|
||||
|
||||
app_name = 'did'
|
||||
|
||||
urlpatterns = [
|
||||
path("<str:pk>", views.PublicDeviceWebView.as_view(), name="device_web"),
|
||||
]
|
263
did/views.py
Normal file
263
did/views.py
Normal file
|
@ -0,0 +1,263 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from django.http import JsonResponse, Http404
|
||||
from django.views.generic.base import TemplateView
|
||||
from device.models import Device
|
||||
from evidence.parse import Build
|
||||
from dpp.api_dlt import ALGORITHM
|
||||
from dpp.models import Proof
|
||||
from dpp.api_dlt import PROOF_TYPE
|
||||
from did.template_credential import dpp_tmpl
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
class PublicDeviceWebView(TemplateView):
|
||||
template_name = "device_did.html"
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
self.pk = kwargs['pk']
|
||||
chid = self.pk.split(":")[0]
|
||||
proof = Proof.objects.filter(signature=self.pk).first()
|
||||
if proof:
|
||||
self.object = Device(id=chid, uuid=proof.uuid)
|
||||
else:
|
||||
self.object = Device(id=chid)
|
||||
|
||||
if not self.object.last_evidence:
|
||||
raise Http404
|
||||
|
||||
if self.request.headers.get('Accept') == 'application/json':
|
||||
return self.get_json_response()
|
||||
return super().get(request, *args, **kwargs)
|
||||
|
||||
def get_context_data(self, **kwargs):
|
||||
self.context = super().get_context_data(**kwargs)
|
||||
self.object.initial()
|
||||
roles = [("Operator", "Operator")]
|
||||
role = "Operator"
|
||||
if self.request.user.is_anonymous:
|
||||
roles = []
|
||||
role = None
|
||||
self.context.update({
|
||||
'object': self.object,
|
||||
'role': role,
|
||||
'roles': roles,
|
||||
'path': self.request.path,
|
||||
'last_dpp': "",
|
||||
'before_dpp': "",
|
||||
})
|
||||
if not self.request.user.is_anonymous:
|
||||
self.get_manuals()
|
||||
return self.context
|
||||
|
||||
@property
|
||||
def public_fields(self):
|
||||
return {
|
||||
'id': self.object.id,
|
||||
'shortid': self.object.shortid,
|
||||
'uuids': self.object.uuids,
|
||||
'hids': self.object.hids,
|
||||
'components': self.remove_serial_number_from(self.object.components),
|
||||
}
|
||||
|
||||
@property
|
||||
def authenticated_fields(self):
|
||||
return {
|
||||
'serial_number': self.object.serial_number,
|
||||
'components': self.object.components,
|
||||
}
|
||||
|
||||
def remove_serial_number_from(self, components):
|
||||
for component in components:
|
||||
if 'serial_number' in component:
|
||||
del component['SerialNumber']
|
||||
return components
|
||||
|
||||
def get_device_data(self):
|
||||
data = self.public_fields
|
||||
if self.request.user.is_authenticated:
|
||||
data.update(self.authenticated_fields)
|
||||
return data
|
||||
|
||||
def get_json_response(self):
|
||||
device_data = self.get_result()
|
||||
# device_data = self.get_device_data()
|
||||
response = JsonResponse(device_data)
|
||||
response["Access-Control-Allow-Origin"] = "*"
|
||||
return response
|
||||
|
||||
def get_result(self):
|
||||
|
||||
if len(self.pk.split(":")) > 1:
|
||||
return self.build_from_dpp()
|
||||
else:
|
||||
return self.build_from_chid()
|
||||
|
||||
def build_from_dpp(self):
|
||||
data = {
|
||||
'document': {},
|
||||
'dpp': self.pk,
|
||||
'algorithm': ALGORITHM,
|
||||
'components': [],
|
||||
'manufacturer DPP': '',
|
||||
'device': {},
|
||||
}
|
||||
dev = Build(self.object.last_evidence.doc, None, check=True)
|
||||
doc = dev.get_phid()
|
||||
data['document'] = json.dumps(doc)
|
||||
data['device'] = dev.device
|
||||
data['components'] = dev.components
|
||||
|
||||
self.object.get_evidences()
|
||||
last_dpp = Proof.objects.filter(
|
||||
uuid__in=self.object.uuids, type=PROOF_TYPE['IssueDPP']
|
||||
).order_by("-timestamp").first()
|
||||
|
||||
key = self.pk
|
||||
if last_dpp:
|
||||
key = last_dpp.signature
|
||||
|
||||
url = "https://{}/did/{}".format(
|
||||
self.request.get_host(),
|
||||
key
|
||||
)
|
||||
data['url_last'] = url
|
||||
tmpl = dpp_tmpl.copy()
|
||||
tmpl["credentialSubject"]["data"] = data
|
||||
return tmpl
|
||||
|
||||
def build_from_chid(self):
|
||||
dpps = []
|
||||
self.object.initial()
|
||||
for d in self.object.evidences:
|
||||
d.get_doc()
|
||||
dev = Build(d.doc, None, check=True)
|
||||
doc = dev.get_phid()
|
||||
ev = json.dumps(doc)
|
||||
phid = dev.get_signature(doc)
|
||||
dpp = "{}:{}".format(self.pk, phid)
|
||||
rr = {
|
||||
'dpp': dpp,
|
||||
'document': ev,
|
||||
'algorithm': ALGORITHM,
|
||||
'manufacturer DPP': '',
|
||||
'device': dev.device,
|
||||
'components': dev.components
|
||||
}
|
||||
|
||||
tmpl = dpp_tmpl.copy()
|
||||
tmpl["credentialSubject"]["data"] = rr
|
||||
|
||||
dpps.append(tmpl)
|
||||
return {
|
||||
'@context': ['https://ereuse.org/dpp0.json'],
|
||||
'data': dpps,
|
||||
}
|
||||
|
||||
def get_manuals(self):
|
||||
manuals = {
|
||||
'ifixit': [],
|
||||
'icecat': [],
|
||||
'details': {},
|
||||
'laer': [],
|
||||
'energystar': {},
|
||||
}
|
||||
try:
|
||||
params = {
|
||||
"manufacturer": self.object.manufacturer,
|
||||
"model": self.object.model,
|
||||
}
|
||||
self.params = json.dumps(params)
|
||||
manuals['ifixit'] = self.request_manuals('ifixit')
|
||||
manuals['icecat'] = self.request_manuals('icecat')
|
||||
manuals['laer'] = self.request_manuals('laer')
|
||||
manuals['energystar'] = self.request_manuals('energystar') or {}
|
||||
if manuals['icecat']:
|
||||
manuals['details'] = manuals['icecat'][0]
|
||||
except Exception as err:
|
||||
logger.error("Error: {}".format(err))
|
||||
|
||||
self.context['manuals'] = manuals
|
||||
self.parse_energystar()
|
||||
|
||||
def parse_energystar(self):
|
||||
if not self.context.get('manuals', {}).get('energystar'):
|
||||
return
|
||||
|
||||
# Defined in:
|
||||
# https://dev.socrata.com/foundry/data.energystar.gov/j7nq-iepp
|
||||
|
||||
energy_types = [
|
||||
'functional_adder_allowances_kwh',
|
||||
'tec_allowance_kwh',
|
||||
'long_idle_watts',
|
||||
'short_idle_watts',
|
||||
'off_mode_watts',
|
||||
'sleep_mode_watts',
|
||||
'tec_of_model_kwh',
|
||||
'tec_requirement_kwh',
|
||||
'work_off_mode_watts',
|
||||
'work_weighted_power_of_model_watts',
|
||||
]
|
||||
energy = {}
|
||||
for field in energy_types:
|
||||
energy[field] = []
|
||||
|
||||
for e in self.context['manuals']['energystar']:
|
||||
for field in energy_types:
|
||||
for k, v in e.items():
|
||||
if not v:
|
||||
continue
|
||||
if field in k:
|
||||
energy[field].append(v)
|
||||
|
||||
for k, v in energy.items():
|
||||
if not v:
|
||||
energy[k] = 0
|
||||
continue
|
||||
tt = sum([float(i) for i in v])
|
||||
energy[k] = round(tt / len(v), 2)
|
||||
|
||||
self.context['manuals']['energystar'] = energy
|
||||
|
||||
def request_manuals(self, prefix):
|
||||
#TODO reimplement manuals service
|
||||
response = {
|
||||
"laer": [{"metal": 40, "plastic_post_consumer": 27, "plastic_post_industry": 34}],
|
||||
"energystar": [{
|
||||
'functional_adder_allowances_kwh': 180,
|
||||
"long_idle_watts": 240,
|
||||
"short_idle_watts": 120,
|
||||
"sleep_mode_watts": 30,
|
||||
"off_mode_watts": 3,
|
||||
"tec_allowance_kwh": 180,
|
||||
"tec_of_model_kwh": 150,
|
||||
"tec_requirement_kwh": 220,
|
||||
"work_off_mode_watts": 70,
|
||||
"work_weighted_power_of_model_watts": 240
|
||||
}],
|
||||
"ifixit": [
|
||||
{
|
||||
"image": "https://guide-images.cdn.ifixit.com/igi/156EpI4YdQeVfVPa.medium",
|
||||
"url": "https://es.ifixit.com/Gu%C3%ADa/HP+ProBook+450+G4+Back+Panel+Replacement/171196?lang=en",
|
||||
"title": "HP ProBook 450 G4 Back Panel Replacement"
|
||||
},
|
||||
{
|
||||
"image": "https://guide-images.cdn.ifixit.com/igi/usTIqCKpuxVWC3Ix.140x105",
|
||||
"url": "https://es.ifixit.com/Gu%C3%ADa/HP+ProBook+450+G4+Display+Assembly+Replacement/171101?lang=en",
|
||||
"title": "Display Assembly Replacement"
|
||||
}
|
||||
],
|
||||
"icecat": [
|
||||
{
|
||||
"logo": "https://images.icecat.biz/img/brand/thumb/1_cf8603f6de7b4c4d8ac4f5f0ef439a05.jpg",
|
||||
"image": "https://guide-images.cdn.ifixit.com/igi/Q2nYjTIQfG6GaI5B.standard",
|
||||
"pdf": "https://icecat.biz/rest/product-pdf?productId=32951710&lang=en",
|
||||
"title": "HP ProBook 450 G3"
|
||||
}
|
||||
]
|
||||
}
|
||||
return response.get(prefix, {})
|
|
@ -5,12 +5,14 @@ services:
|
|||
dockerfile: docker/devicehub-django.Dockerfile
|
||||
environment:
|
||||
- DEBUG=${DEBUG:-false}
|
||||
- DOMAIN=${DOMAIN:-localhost}
|
||||
- ALLOWED_HOSTS=${ALLOWED_HOSTS:-$DOMAIN}
|
||||
- DOMAIN=${DH_DOMAIN:-localhost}
|
||||
- PORT=${DH_PORT:-8000}
|
||||
- ALLOWED_HOSTS=${DH_ALLOWED_HOSTS:-$DH_DOMAIN}
|
||||
- DEMO=${DEMO:-false}
|
||||
- PREDEFINED_TOKEN=${PREDEFINED_TOKEN:-}
|
||||
- DPP=${DPP:-false}
|
||||
volumes:
|
||||
- .:/opt/devicehub-django
|
||||
ports:
|
||||
- 8000:8000
|
||||
- ${DH_PORT}:${DH_PORT}
|
||||
|
||||
|
|
|
@ -20,7 +20,9 @@ main() {
|
|||
echo "WARNING: .env was not there, .env.example was copied, this only happens once"
|
||||
fi
|
||||
# remove old database
|
||||
sudo rm -vfr ./db/*
|
||||
rm -vfr ./db/*
|
||||
# deactivate configured flag
|
||||
rm -vfr ./already_configured
|
||||
docker compose down -v
|
||||
docker compose build
|
||||
docker compose up ${detach_arg:-}
|
||||
|
|
|
@ -7,8 +7,14 @@ RUN apt update && \
|
|||
git \
|
||||
sqlite3 \
|
||||
jq \
|
||||
time \
|
||||
vim \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# TODO I don't like this, but the whole ereuse-dpp works with user 1000 because of the volume mapping
|
||||
# thanks https://stackoverflow.com/questions/70520205/docker-non-root-user-best-practices-for-python-images
|
||||
RUN adduser --home /opt/devicehub-django -u 1000 app
|
||||
|
||||
WORKDIR /opt/devicehub-django
|
||||
|
||||
# reduce size (python specifics) -> src https://stackoverflow.com/questions/74616667/removing-pip-cache-after-installing-dependencies-in-docker-image
|
||||
|
@ -22,15 +28,18 @@ compile = no
|
|||
no-cache-dir = True
|
||||
END
|
||||
|
||||
# upgrade pip, which might fail on lxc, then remove the "corrupted file"
|
||||
RUN python -m pip install --upgrade pip || (rm -rf /usr/local/lib/python3.11/site-packages/pip-*.dist-info && python -m pip install --upgrade pip)
|
||||
|
||||
COPY ./requirements.txt /opt/devicehub-django
|
||||
RUN pip install -r requirements.txt
|
||||
# TODO hardcoded, is ignored in requirements.txt
|
||||
RUN pip install -i https://test.pypi.org/simple/ ereuseapitest==0.0.14
|
||||
|
||||
# TODO Is there a better way?
|
||||
# Set PYTHONPATH to include the directory with the xapian module
|
||||
ENV PYTHONPATH="${PYTHONPATH}:/usr/lib/python3/dist-packages"
|
||||
|
||||
COPY docker/devicehub-django.entrypoint.sh /
|
||||
|
||||
RUN chown -R app:app /opt/devicehub-django
|
||||
|
||||
USER app
|
||||
ENTRYPOINT sh /devicehub-django.entrypoint.sh
|
||||
|
|
|
@ -5,6 +5,149 @@ set -u
|
|||
# DEBUG
|
||||
set -x
|
||||
|
||||
# TODO there is a conflict between two shared vars
|
||||
# 1. from the original docker compose devicehub-teal
|
||||
# 2. from the new docker compose that integrates all dpp services
|
||||
wait_for_dpp_shared() {
|
||||
while true; do
|
||||
# specially ensure VERAMO_API_CRED_FILE is not empty,
|
||||
# it takes some time to get data in
|
||||
OPERATOR_TOKEN_FILE='operator-token.txt'
|
||||
if [ -f "/shared/${OPERATOR_TOKEN_FILE}" ] && \
|
||||
[ -f "/shared/create_user_operator_finished" ]; then
|
||||
sleep 5
|
||||
echo "Files ready to process."
|
||||
break
|
||||
else
|
||||
echo "Waiting for file in shared: ${OPERATOR_TOKEN_FILE}"
|
||||
sleep 5
|
||||
fi
|
||||
done
|
||||
}
|
||||
|
||||
# 3. Generate an environment .env file.
|
||||
# TODO cargar via shared
|
||||
gen_env_vars() {
|
||||
INIT_ORG="${INIT_ORG:-example-org}"
|
||||
INIT_USER="${INIT_USER:-user@example.org}"
|
||||
INIT_PASSWD="${INIT_PASSWD:-1234}"
|
||||
ADMIN='True'
|
||||
PREDEFINED_TOKEN="${PREDEFINED_TOKEN:-}"
|
||||
# specific dpp env vars
|
||||
if [ "${DPP:-}" = 'true' ]; then
|
||||
# fill env vars in this docker entrypoint
|
||||
wait_for_dpp_shared
|
||||
export API_DLT='http://api_connector:3010'
|
||||
export API_DLT_TOKEN="$(cat "/shared/${OPERATOR_TOKEN_FILE}")"
|
||||
export API_RESOLVER='http://id_index_api:3012'
|
||||
# TODO hardcoded
|
||||
export ID_FEDERATED='DH1'
|
||||
# propagate to .env
|
||||
dpp_env_vars="$(cat <<END
|
||||
API_DLT=${API_DLT}
|
||||
API_DLT_TOKEN=${API_DLT_TOKEN}
|
||||
API_RESOLVER=${API_RESOLVER}
|
||||
ID_FEDERATED=${ID_FEDERATED}
|
||||
END
|
||||
)"
|
||||
# generate config using env vars from docker
|
||||
# TODO rethink if this is needed because now this is django, not flask
|
||||
cat > .env <<END
|
||||
${dpp_env_vars:-}
|
||||
END
|
||||
fi
|
||||
}
|
||||
|
||||
handle_federated_id() {
|
||||
|
||||
# devicehub host and id federated checker
|
||||
|
||||
# //getAll queries are not accepted by this service, so we remove them
|
||||
EXPECTED_ID_FEDERATED="$(curl -s "${API_RESOLVER%/}/getAll" \
|
||||
| jq -r '.url | to_entries | .[] | select(.value == "'"${DEVICEHUB_HOST}"'") | .key' \
|
||||
| head -n 1)"
|
||||
|
||||
# if is a new DEVICEHUB_HOST, then register it
|
||||
if [ -z "${EXPECTED_ID_FEDERATED}" ]; then
|
||||
# TODO better docker compose run command
|
||||
cmd="docker compose run --entrypoint= devicehub flask dlt_insert_members ${DEVICEHUB_HOST}"
|
||||
big_error "No FEDERATED ID maybe you should run \`${cmd}\`"
|
||||
fi
|
||||
|
||||
# if not new DEVICEHUB_HOST, then check consistency
|
||||
|
||||
# if there is already an ID in the DLT, it should match with my internal ID
|
||||
if [ ! "${EXPECTED_ID_FEDERATED}" = "${ID_FEDERATED}" ]; then
|
||||
|
||||
big_error "ID_FEDERATED should be ${EXPECTED_ID_FEDERATED} instead of ${ID_FEDERATED}"
|
||||
fi
|
||||
|
||||
# not needed, but reserved
|
||||
# EXPECTED_DEVICEHUB_HOST="$(curl -s "${API_RESOLVER%/}/getAll" \
|
||||
# | jq -r '.url | to_entries | .[] | select(.key == "'"${ID_FEDERATED}"'") | .value' \
|
||||
# | head -n 1)"
|
||||
# if [ ! "${EXPECTED_DEVICEHUB_HOST}" = "${DEVICEHUB_HOST}" ]; then
|
||||
# big_error "ERROR: DEVICEHUB_HOST should be ${EXPECTED_DEVICEHUB_HOST} instead of ${DEVICEHUB_HOST}"
|
||||
# fi
|
||||
|
||||
}
|
||||
|
||||
config_dpp_part1() {
|
||||
# 12. Add a new server to the 'api resolver'
|
||||
if [ "${ID_SERVICE:-}" ]; then
|
||||
handle_federated_id
|
||||
else
|
||||
# TODO when this runs more than one time per service, this is a problem, but for the docker-reset.sh workflow, that's fine
|
||||
# TODO put this in already_configured
|
||||
# TODO hardcoded http proto and port
|
||||
./manage.py dlt_insert_members "http://${DOMAIN}:8000"
|
||||
fi
|
||||
|
||||
# 13. Do a rsync api resolve
|
||||
./manage.py dlt_rsync_members
|
||||
|
||||
# 14. Register a new user to the DLT
|
||||
DATASET_FILE='/tmp/dataset.json'
|
||||
cat > "${DATASET_FILE}" <<END
|
||||
{
|
||||
"email": "${INIT_USER}",
|
||||
"password": "${INIT_PASSWD}",
|
||||
"api_token": "${API_DLT_TOKEN}"
|
||||
}
|
||||
END
|
||||
./manage.py dlt_register_user "${DATASET_FILE}"
|
||||
}
|
||||
|
||||
config_phase() {
|
||||
# TODO review this flag file
|
||||
init_flagfile="${program_dir}/already_configured"
|
||||
if [ ! -f "${init_flagfile}" ]; then
|
||||
|
||||
# non DL user (only for the inventory)
|
||||
./manage.py add_institution "${INIT_ORG}"
|
||||
# TODO: one error on add_user, and you don't add user anymore
|
||||
./manage.py add_user "${INIT_ORG}" "${INIT_USER}" "${INIT_PASSWD}" "${ADMIN}" "${PREDEFINED_TOKEN}"
|
||||
|
||||
if [ "${DPP:-}" = 'true' ]; then
|
||||
# 12, 13, 14
|
||||
config_dpp_part1
|
||||
|
||||
# cleanup other spnapshots and copy dlt/dpp snapshots
|
||||
# TODO make this better
|
||||
rm example/snapshots/*
|
||||
cp example/dpp-snapshots/*.json example/snapshots/
|
||||
fi
|
||||
|
||||
# # 15. Add inventory snapshots for user "${INIT_USER}".
|
||||
if [ "${DEMO:-}" = 'true' ]; then
|
||||
/usr/bin/time ./manage.py up_snapshots example/snapshots/ "${INIT_USER}"
|
||||
fi
|
||||
|
||||
# remain next command as the last operation for this if conditional
|
||||
touch "${init_flagfile}"
|
||||
fi
|
||||
}
|
||||
|
||||
check_app_is_there() {
|
||||
if [ ! -f "./manage.py" ]; then
|
||||
usage
|
||||
|
@ -13,7 +156,7 @@ check_app_is_there() {
|
|||
|
||||
deploy() {
|
||||
# TODO this is weird, find better workaround
|
||||
git config --global --add safe.directory /opt/devicehub-django
|
||||
git config --global --add safe.directory "${program_dir}"
|
||||
export COMMIT=$(git log --format="%H %ad" --date=iso -n 1)
|
||||
|
||||
if [ "${DEBUG:-}" = 'true' ]; then
|
||||
|
@ -31,18 +174,7 @@ deploy() {
|
|||
# inspired by https://medium.com/analytics-vidhya/django-with-docker-and-docker-compose-python-part-2-8415976470cc
|
||||
echo "INFO detected NEW deployment"
|
||||
./manage.py migrate
|
||||
INIT_ORG="${INIT_ORG:-example-org}"
|
||||
INIT_USER="${INIT_USER:-user@example.org}"
|
||||
INIT_PASSWD="${INIT_PASSWD:-1234}"
|
||||
ADMIN='True'
|
||||
PREDEFINED_TOKEN="${PREDEFINED_TOKEN:-}"
|
||||
./manage.py add_institution "${INIT_ORG}"
|
||||
# TODO: one error on add_user, and you don't add user anymore
|
||||
./manage.py add_user "${INIT_ORG}" "${INIT_USER}" "${INIT_PASSWD}" "${ADMIN}" "${PREDEFINED_TOKEN}"
|
||||
|
||||
if [ "${DEMO:-}" = 'true' ]; then
|
||||
./manage.py up_snapshots example/snapshots/ "${INIT_USER}"
|
||||
fi
|
||||
config_phase
|
||||
fi
|
||||
}
|
||||
|
||||
|
@ -70,6 +202,7 @@ runserver() {
|
|||
main() {
|
||||
program_dir='/opt/devicehub-django'
|
||||
cd "${program_dir}"
|
||||
gen_env_vars
|
||||
deploy
|
||||
runserver
|
||||
}
|
||||
|
|
0
dpp/__init__.py
Normal file
0
dpp/__init__.py
Normal file
3
dpp/admin.py
Normal file
3
dpp/admin.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from django.contrib import admin
|
||||
|
||||
# Register your models here.
|
166
dpp/api_dlt.py
Normal file
166
dpp/api_dlt.py
Normal file
|
@ -0,0 +1,166 @@
|
|||
import json
|
||||
import time
|
||||
import logging
|
||||
|
||||
from django.conf import settings
|
||||
from ereuseapi.methods import API
|
||||
|
||||
from dpp.models import Proof, UserDpp
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
# """The code of the status response of api dlt."""
|
||||
STATUS_CODE = {
|
||||
"Success": 201,
|
||||
"Notwork": 400
|
||||
}
|
||||
|
||||
|
||||
ALGORITHM = "sha3-256"
|
||||
|
||||
|
||||
PROOF_TYPE = {
|
||||
'Register': 'Register',
|
||||
'IssueDPP': 'IssueDPP',
|
||||
'proof_of_recycling': 'proof_of_recycling',
|
||||
'Erase': 'Erase',
|
||||
'EWaste': 'EWaste',
|
||||
}
|
||||
|
||||
|
||||
def connect_api(user):
|
||||
|
||||
dp = UserDpp.objects.filter(user=user).first()
|
||||
if not dp:
|
||||
return
|
||||
|
||||
api_dlt = settings.API_DLT
|
||||
token_dlt = dp.api_keys_dlt
|
||||
|
||||
if not api_dlt or not token_dlt:
|
||||
logger.error("NOT POSSIBLE CONNECT WITH API DLT!!!")
|
||||
return
|
||||
|
||||
return API(api_dlt, token_dlt, "ethereum")
|
||||
|
||||
|
||||
def register_dlt(api, chid, phid, proof_type=None):
|
||||
if proof_type:
|
||||
return api.generate_proof(
|
||||
chid,
|
||||
ALGORITHM,
|
||||
phid,
|
||||
proof_type,
|
||||
settings.ID_FEDERATED
|
||||
)
|
||||
|
||||
return api.register_device(
|
||||
chid,
|
||||
ALGORITHM,
|
||||
phid,
|
||||
settings.ID_FEDERATED
|
||||
)
|
||||
|
||||
|
||||
def issuer_dpp_dlt(api, dpp):
|
||||
phid = dpp.split(":")[1]
|
||||
|
||||
return api.issue_passport(
|
||||
dpp,
|
||||
ALGORITHM,
|
||||
phid,
|
||||
settings.ID_FEDERATED
|
||||
)
|
||||
|
||||
|
||||
|
||||
def save_proof(signature, ev_uuid, result, proof_type, user):
|
||||
if result['Status'] == STATUS_CODE.get("Success"):
|
||||
timestamp = result.get('Data', {}).get('data', {}).get('timestamp')
|
||||
|
||||
if not timestamp:
|
||||
return
|
||||
|
||||
logger.debug("timestamp: %s", timestamp)
|
||||
d = {
|
||||
"type": proof_type,
|
||||
"timestamp": timestamp,
|
||||
"issuer": user.institution,
|
||||
"user": user,
|
||||
"uuid": ev_uuid,
|
||||
"signature": signature,
|
||||
}
|
||||
Proof.objects.create(**d)
|
||||
|
||||
|
||||
def register_device_dlt(chid, phid, ev_uuid, user):
|
||||
cny_a = 1
|
||||
while cny_a:
|
||||
api = connect_api(user)
|
||||
if not api:
|
||||
cny_a = 0
|
||||
return
|
||||
|
||||
result = register_dlt(api, chid, phid)
|
||||
try:
|
||||
assert result['Status'] == STATUS_CODE.get("Success")
|
||||
assert result['Data']['data']['timestamp']
|
||||
cny_a = 0
|
||||
except Exception:
|
||||
if result.get("Data") != "Device already exists":
|
||||
logger.error("API return: %s", result)
|
||||
time.sleep(10)
|
||||
else:
|
||||
cny_a = 0
|
||||
|
||||
save_proof(phid, ev_uuid, result, PROOF_TYPE['Register'], user)
|
||||
|
||||
|
||||
# TODO is neccesary?
|
||||
if settings.ID_FEDERATED:
|
||||
cny = 1
|
||||
while cny:
|
||||
try:
|
||||
api.add_service(
|
||||
chid,
|
||||
'DeviceHub',
|
||||
settings.ID_FEDERATED,
|
||||
'Inventory service',
|
||||
'Inv',
|
||||
)
|
||||
cny = 0
|
||||
except Exception:
|
||||
time.sleep(10)
|
||||
|
||||
|
||||
def register_passport_dlt(chid, phid, ev_uuid, user):
|
||||
token_dlt = settings.TOKEN_DLT
|
||||
api_dlt = settings.API_DLT
|
||||
if not token_dlt or not api_dlt:
|
||||
return
|
||||
|
||||
dpp = "{chid}:{phid}".format(chid=chid, phid=phid)
|
||||
if Proof.objects.filter(signature=dpp, type=PROOF_TYPE['IssueDPP']).exists():
|
||||
return
|
||||
|
||||
cny_a = 1
|
||||
while cny_a:
|
||||
try:
|
||||
api = connect_api(user)
|
||||
if not api:
|
||||
cny_a = 0
|
||||
return
|
||||
|
||||
result = issuer_dpp_dlt(api, dpp)
|
||||
cny_a = 0
|
||||
except Exception as err:
|
||||
logger.error("ERROR API issue passport return: %s", err)
|
||||
time.sleep(10)
|
||||
|
||||
if result['Status'] is not STATUS_CODE.get("Success"):
|
||||
logger.error("ERROR API issue passport return: %s", result)
|
||||
return
|
||||
|
||||
save_proof(phid, ev_uuid, result, PROOF_TYPE['IssueDPP'], user)
|
6
dpp/apps.py
Normal file
6
dpp/apps.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class DppConfig(AppConfig):
|
||||
default_auto_field = "django.db.models.BigAutoField"
|
||||
name = "dpp"
|
35
dpp/management/commands/dlt_insert_members.py
Normal file
35
dpp/management/commands/dlt_insert_members.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
import logging
|
||||
import requests
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.conf import settings
|
||||
from user.models import Institution
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Insert a new Institution in DLT"
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('domain', type=str, help='institution')
|
||||
|
||||
def handle(self, *args, **kwargs):
|
||||
domain = kwargs.get("domain")
|
||||
api = settings.API_RESOLVER
|
||||
if not api:
|
||||
logger.error("you need set the var API_RESOLVER")
|
||||
return
|
||||
|
||||
if "http" not in domain:
|
||||
logger.error("you need put https:// in %s", domain)
|
||||
return
|
||||
|
||||
api = api.strip("/")
|
||||
domain = domain.strip("/")
|
||||
|
||||
data = {"url": domain}
|
||||
url = api + '/registerURL'
|
||||
res = requests.post(url, json=data)
|
||||
print(res.json())
|
72
dpp/management/commands/dlt_register_user.py
Normal file
72
dpp/management/commands/dlt_register_user.py
Normal file
|
@ -0,0 +1,72 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from ereuseapi.methods import API
|
||||
from django.conf import settings
|
||||
from django.core.management.base import BaseCommand
|
||||
from user.models import User, Institution
|
||||
from dpp.models import UserDpp
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Insert users than are in Dlt with params: path of data set file"
|
||||
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('dataset_file', type=str, help='institution')
|
||||
|
||||
def handle(self, *args, **kwargs):
|
||||
dataset_file = kwargs.get("dataset_file")
|
||||
self.api_dlt = settings.API_DLT
|
||||
self.institution = Institution.objects.filter().first()
|
||||
if not self.api_dlt:
|
||||
logger.error("you need set the var API_DLT")
|
||||
return
|
||||
|
||||
self.api_dlt = self.api_dlt.strip("/")
|
||||
|
||||
with open(dataset_file) as f:
|
||||
dataset = json.loads(f.read())
|
||||
|
||||
self.add_user(dataset)
|
||||
|
||||
def add_user(self, data):
|
||||
email = data.get("email")
|
||||
password = data.get("password")
|
||||
api_token = data.get("api_token")
|
||||
# ethereum = {"data": {"api_token": api_token}}
|
||||
# data_eth = json.dumps(ethereum)
|
||||
data_eth = json.dumps(api_token)
|
||||
# TODO encrypt in the future
|
||||
# api_keys_dlt = encrypt(password, data_eth)
|
||||
api_keys_dlt = data_eth.strip('"').strip("'")
|
||||
|
||||
user = User.objects.filter(email=email).first()
|
||||
|
||||
if not user:
|
||||
user = User.objects.create(
|
||||
email=email,
|
||||
password=password,
|
||||
institution = self.institution
|
||||
)
|
||||
|
||||
roles = []
|
||||
token_dlt = api_token
|
||||
api = API(self.api_dlt, token_dlt, "ethereum")
|
||||
result = api.check_user_roles()
|
||||
|
||||
if result.get('Status') == 200:
|
||||
if 'Success' in result.get('Data', {}).get('status'):
|
||||
rols = result.get('Data', {}).get('data', {})
|
||||
roles = [(k, k) for k, v in rols.items() if v]
|
||||
|
||||
roles_dlt = json.dumps(roles)
|
||||
|
||||
UserDpp.objects.create(
|
||||
roles_dlt=roles_dlt,
|
||||
api_keys_dlt=api_keys_dlt,
|
||||
user=user
|
||||
)
|
47
dpp/management/commands/dlt_rsync_members.py
Normal file
47
dpp/management/commands/dlt_rsync_members.py
Normal file
|
@ -0,0 +1,47 @@
|
|||
import logging
|
||||
import requests
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.conf import settings
|
||||
from dpp.models import MemberFederated
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Synchronize members of DLT"
|
||||
|
||||
def handle(self, *args, **kwargs):
|
||||
api = settings.API_RESOLVER
|
||||
if not api:
|
||||
logger.error("you need set the var API_RESOLVER")
|
||||
return
|
||||
|
||||
|
||||
api = api.strip("/")
|
||||
|
||||
url = api + '/getAll'
|
||||
res = requests.get(url)
|
||||
if res.status_code != 200:
|
||||
return "Error, {}".format(res.text)
|
||||
response = res.json()
|
||||
members = response['url']
|
||||
counter = members.pop('counter')
|
||||
if counter <= MemberFederated.objects.count():
|
||||
logger.info("Synchronize members of DLT -> All Ok")
|
||||
return "All ok"
|
||||
|
||||
for k, v in members.items():
|
||||
id = self.clean_id(k)
|
||||
member = MemberFederated.objects.filter(dlt_id_provider=id).first()
|
||||
if member:
|
||||
if member.domain != v:
|
||||
member.domain = v
|
||||
member.save()
|
||||
continue
|
||||
MemberFederated.objects.create(dlt_id_provider=id, domain=v)
|
||||
return res.text
|
||||
|
||||
def clean_id(self, id):
|
||||
return int(id.split('DH')[-1])
|
52
dpp/migrations/0001_initial.py
Normal file
52
dpp/migrations/0001_initial.py
Normal file
|
@ -0,0 +1,52 @@
|
|||
# Generated by Django 5.0.6 on 2024-11-18 14:29
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
("user", "0001_initial"),
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="Proof",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("timestamp", models.IntegerField()),
|
||||
("uuid", models.UUIDField()),
|
||||
("signature", models.CharField(max_length=256)),
|
||||
("type", models.CharField(max_length=256)),
|
||||
(
|
||||
"issuer",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
to="user.institution",
|
||||
),
|
||||
),
|
||||
(
|
||||
"user",
|
||||
models.ForeignKey(
|
||||
blank=True,
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
to=settings.AUTH_USER_MODEL,
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
]
|
25
dpp/migrations/0002_memberfederated.py
Normal file
25
dpp/migrations/0002_memberfederated.py
Normal file
|
@ -0,0 +1,25 @@
|
|||
# Generated by Django 5.0.6 on 2024-11-19 19:18
|
||||
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("dpp", "0001_initial"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name="MemberFederated",
|
||||
fields=[
|
||||
(
|
||||
"dlt_id_provider",
|
||||
models.IntegerField(primary_key=True, serialize=False),
|
||||
),
|
||||
("domain", models.CharField(max_length=256)),
|
||||
("client_id", models.CharField(max_length=256)),
|
||||
("client_secret", models.CharField(max_length=256)),
|
||||
],
|
||||
),
|
||||
]
|
60
dpp/migrations/0003_memberfederated_institution_and_more.py
Normal file
60
dpp/migrations/0003_memberfederated_institution_and_more.py
Normal file
|
@ -0,0 +1,60 @@
|
|||
# Generated by Django 5.0.6 on 2024-11-20 10:51
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.conf import settings
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("dpp", "0002_memberfederated"),
|
||||
("user", "0001_initial"),
|
||||
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AddField(
|
||||
model_name="memberfederated",
|
||||
name="institution",
|
||||
field=models.ForeignKey(
|
||||
blank=True,
|
||||
null=True,
|
||||
on_delete=django.db.models.deletion.SET_NULL,
|
||||
to="user.institution",
|
||||
),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name="memberfederated",
|
||||
name="client_id",
|
||||
field=models.CharField(max_length=256, null=True),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name="memberfederated",
|
||||
name="client_secret",
|
||||
field=models.CharField(max_length=256, null=True),
|
||||
),
|
||||
migrations.CreateModel(
|
||||
name="UserDpp",
|
||||
fields=[
|
||||
(
|
||||
"id",
|
||||
models.BigAutoField(
|
||||
auto_created=True,
|
||||
primary_key=True,
|
||||
serialize=False,
|
||||
verbose_name="ID",
|
||||
),
|
||||
),
|
||||
("roles_dlt", models.TextField()),
|
||||
("api_keys_dlt", models.TextField()),
|
||||
(
|
||||
"user",
|
||||
models.ForeignKey(
|
||||
on_delete=django.db.models.deletion.CASCADE,
|
||||
to=settings.AUTH_USER_MODEL,
|
||||
),
|
||||
),
|
||||
],
|
||||
),
|
||||
]
|
0
dpp/migrations/__init__.py
Normal file
0
dpp/migrations/__init__.py
Normal file
32
dpp/models.py
Normal file
32
dpp/models.py
Normal file
|
@ -0,0 +1,32 @@
|
|||
from django.db import models
|
||||
from user.models import User, Institution
|
||||
from utils.constants import STR_EXTEND_SIZE
|
||||
# Create your models here.
|
||||
|
||||
|
||||
class Proof(models.Model):
|
||||
## The signature can be a phid or dpp depending of type of Proof
|
||||
timestamp = models.IntegerField()
|
||||
uuid = models.UUIDField()
|
||||
signature = models.CharField(max_length=STR_EXTEND_SIZE)
|
||||
type = models.CharField(max_length=STR_EXTEND_SIZE)
|
||||
issuer = models.ForeignKey(Institution, on_delete=models.CASCADE)
|
||||
user = models.ForeignKey(
|
||||
User, on_delete=models.SET_NULL, null=True, blank=True)
|
||||
|
||||
|
||||
class MemberFederated(models.Model):
|
||||
dlt_id_provider = models.IntegerField(primary_key=True)
|
||||
domain = models.CharField(max_length=STR_EXTEND_SIZE)
|
||||
# This client_id and client_secret is used for connected to this domain as
|
||||
# a client and this domain then is the server of auth
|
||||
client_id = models.CharField(max_length=STR_EXTEND_SIZE, null=True)
|
||||
client_secret = models.CharField(max_length=STR_EXTEND_SIZE, null=True)
|
||||
institution = models.ForeignKey(
|
||||
Institution, on_delete=models.SET_NULL, null=True, blank=True)
|
||||
|
||||
|
||||
class UserDpp(models.Model):
|
||||
roles_dlt = models.TextField()
|
||||
api_keys_dlt = models.TextField()
|
||||
user = models.ForeignKey(User, on_delete=models.CASCADE)
|
3
dpp/tests.py
Normal file
3
dpp/tests.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
from django.test import TestCase
|
||||
|
||||
# Create your tests here.
|
8
dpp/urls.py
Normal file
8
dpp/urls.py
Normal file
|
@ -0,0 +1,8 @@
|
|||
from django.urls import path
|
||||
from dpp import views
|
||||
|
||||
app_name = 'dpp'
|
||||
|
||||
urlpatterns = [
|
||||
path("<int:proof_id>/", views.ProofView.as_view(), name="proof"),
|
||||
]
|
40
dpp/views.py
Normal file
40
dpp/views.py
Normal file
|
@ -0,0 +1,40 @@
|
|||
import json
|
||||
import logging
|
||||
import hashlib
|
||||
|
||||
from django.views.generic.edit import View
|
||||
from django.http import JsonResponse
|
||||
|
||||
from dpp.api_dlt import ALGORITHM
|
||||
from evidence.models import Evidence
|
||||
from evidence.parse import Build
|
||||
from dpp.models import Proof
|
||||
|
||||
|
||||
class ProofView(View):
|
||||
|
||||
def get(self, request, *args, **kwargs):
|
||||
timestamp = kwargs.get("proof_id")
|
||||
proof = Proof.objects.filter(timestamp=timestamp).first()
|
||||
if not proof:
|
||||
return JsonResponse({}, status=404)
|
||||
|
||||
ev = Evidence(proof.uuid)
|
||||
if not ev.doc:
|
||||
return JsonResponse({}, status=404)
|
||||
|
||||
dev = Build(ev.doc, None, check=True)
|
||||
doc = dev.get_phid()
|
||||
|
||||
data = {
|
||||
"algorithm": ALGORITHM,
|
||||
"document": json.dumps(doc)
|
||||
}
|
||||
|
||||
d = {
|
||||
'@context': ['https://ereuse.org/proof0.json'],
|
||||
'data': data,
|
||||
}
|
||||
response = JsonResponse(d, status=200)
|
||||
response["Access-Control-Allow-Origin"] = "*"
|
||||
return response
|
|
@ -29,17 +29,18 @@ class UploadForm(forms.Form):
|
|||
|
||||
try:
|
||||
file_json = json.loads(file_data)
|
||||
Build(file_json, None, check=True)
|
||||
build = Build
|
||||
snap = build(file_json, None, check=True)
|
||||
exist_annotation = Annotation.objects.filter(
|
||||
uuid=file_json['uuid']
|
||||
uuid=snap.uuid
|
||||
).first()
|
||||
|
||||
|
||||
if exist_annotation:
|
||||
raise ValidationError(
|
||||
raise ValidationError(
|
||||
_("The snapshot already exists"),
|
||||
code="duplicate_snapshot",
|
||||
)
|
||||
|
||||
|
||||
#Catch any error and display it as Validation Error so the Form handles it
|
||||
except Exception as e:
|
||||
raise ValidationError(
|
||||
|
@ -57,7 +58,9 @@ class UploadForm(forms.Form):
|
|||
|
||||
for ev in self.evidences:
|
||||
path_name = save_in_disk(ev[1], user.institution.name)
|
||||
Build(ev[1], user)
|
||||
build = Build
|
||||
file_json = ev[1]
|
||||
build(file_json, user)
|
||||
move_json(path_name, user.institution.name)
|
||||
|
||||
|
||||
|
@ -221,7 +224,7 @@ class EraseServerForm(forms.Form):
|
|||
|
||||
if self.instance:
|
||||
return
|
||||
|
||||
|
||||
Annotation.objects.create(
|
||||
uuid=self.uuid,
|
||||
type=Annotation.Type.ERASE_SERVER,
|
||||
|
|
69
evidence/legacy_parse.py
Normal file
69
evidence/legacy_parse.py
Normal file
|
@ -0,0 +1,69 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from dmidecode import DMIParse
|
||||
from json_repair import repair_json
|
||||
from evidence.mixin_parse import BuildMix
|
||||
from evidence.legacy_parse_details import get_lshw_child, ParseSnapshot
|
||||
from utils.constants import CHASSIS_DH
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
def get_mac(lshw):
|
||||
try:
|
||||
if type(lshw) is dict:
|
||||
hw = lshw
|
||||
else:
|
||||
hw = json.loads(lshw)
|
||||
except json.decoder.JSONDecodeError:
|
||||
hw = json.loads(repair_json(lshw))
|
||||
|
||||
nets = []
|
||||
get_lshw_child(hw, nets, 'network')
|
||||
|
||||
nets_sorted = sorted(nets, key=lambda x: x['businfo'])
|
||||
|
||||
if nets_sorted:
|
||||
mac = nets_sorted[0]['serial']
|
||||
logger.debug("The snapshot has the following MAC: %s" , mac)
|
||||
return mac
|
||||
|
||||
|
||||
class Build(BuildMix):
|
||||
# This parse is for get info from snapshots created with
|
||||
# workbench-script but builded for send to devicehub-teal
|
||||
|
||||
def get_details(self):
|
||||
dmidecode_raw = self.json["data"]["dmidecode"]
|
||||
self.dmi = DMIParse(dmidecode_raw)
|
||||
|
||||
self.manufacturer = self.dmi.manufacturer().strip()
|
||||
self.model = self.dmi.model().strip()
|
||||
self.chassis = self.get_chassis_dh()
|
||||
self.serial_number = self.dmi.serial_number()
|
||||
self.sku = self.get_sku()
|
||||
self.typ = self.chassis
|
||||
self.version = self.get_version()
|
||||
|
||||
def get_chassis_dh(self):
|
||||
chassis = self.get_chassis()
|
||||
lower_type = chassis.lower()
|
||||
for k, v in CHASSIS_DH.items():
|
||||
if lower_type in v:
|
||||
return k
|
||||
return self.default
|
||||
|
||||
def get_sku(self):
|
||||
return self.dmi.get("System")[0].get("SKU Number", "n/a").strip()
|
||||
|
||||
def get_chassis(self):
|
||||
return self.dmi.get("Chassis")[0].get("Type", '_virtual') #
|
||||
|
||||
def get_version(self):
|
||||
return self.dmi.get("System")[0].get("Verson", '_virtual')
|
||||
|
||||
def _get_components(self):
|
||||
data = ParseSnapshot(self.json)
|
||||
self.components = data.components
|
503
evidence/legacy_parse_details.py
Normal file
503
evidence/legacy_parse_details.py
Normal file
|
@ -0,0 +1,503 @@
|
|||
import json
|
||||
import logging
|
||||
import numpy as np
|
||||
|
||||
from datetime import datetime
|
||||
from dmidecode import DMIParse
|
||||
from json_repair import repair_json
|
||||
|
||||
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
def get_lshw_child(child, nets, component):
|
||||
try:
|
||||
if child.get('id') == component:
|
||||
nets.append(child)
|
||||
if child.get('children'):
|
||||
[get_lshw_child(x, nets, component) for x in child['children']]
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
class ParseSnapshot:
|
||||
def __init__(self, snapshot, default="n/a"):
|
||||
self.default = default
|
||||
self.dmidecode_raw = snapshot["data"].get("dmidecode", "{}")
|
||||
self.smart_raw = snapshot["data"].get("disks", [])
|
||||
self.hwinfo_raw = snapshot["data"].get("hwinfo", "")
|
||||
self.lshw_raw = snapshot["data"].get("lshw", {}) or {}
|
||||
self.lscpi_raw = snapshot["data"].get("lspci", "")
|
||||
self.device = {"actions": []}
|
||||
self.components = []
|
||||
self.monitors = []
|
||||
|
||||
self.dmi = DMIParse(self.dmidecode_raw)
|
||||
self.smart = self.loads(self.smart_raw)
|
||||
self.lshw = self.loads(self.lshw_raw)
|
||||
self.hwinfo = self.parse_hwinfo()
|
||||
|
||||
self.set_computer()
|
||||
self.get_hwinfo_monitors()
|
||||
self.set_components()
|
||||
self.snapshot_json = {
|
||||
"type": "Snapshot",
|
||||
"device": self.device,
|
||||
"software": snapshot["software"],
|
||||
"components": self.components,
|
||||
"uuid": snapshot['uuid'],
|
||||
"version": snapshot['version'],
|
||||
"endTime": snapshot["timestamp"],
|
||||
"elapsed": 1,
|
||||
}
|
||||
|
||||
def set_computer(self):
|
||||
self.device['manufacturer'] = self.dmi.manufacturer().strip()
|
||||
self.device['model'] = self.dmi.model().strip()
|
||||
self.device['serialNumber'] = self.dmi.serial_number()
|
||||
self.device['type'] = self.get_type()
|
||||
self.device['sku'] = self.get_sku()
|
||||
self.device['version'] = self.get_version()
|
||||
self.device['system_uuid'] = self.get_uuid()
|
||||
self.device['family'] = self.get_family()
|
||||
self.device['chassis'] = self.get_chassis_dh()
|
||||
|
||||
def set_components(self):
|
||||
self.get_cpu()
|
||||
self.get_ram()
|
||||
self.get_mother_board()
|
||||
self.get_graphic()
|
||||
self.get_data_storage()
|
||||
self.get_display()
|
||||
self.get_sound_card()
|
||||
self.get_networks()
|
||||
|
||||
def get_cpu(self):
|
||||
for cpu in self.dmi.get('Processor'):
|
||||
serial = cpu.get('Serial Number')
|
||||
if serial == 'Not Specified' or not serial:
|
||||
serial = cpu.get('ID').replace(' ', '')
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "Processor",
|
||||
"speed": self.get_cpu_speed(cpu),
|
||||
"cores": int(cpu.get('Core Count', 1)),
|
||||
"model": cpu.get('Version'),
|
||||
"threads": int(cpu.get('Thread Count', 1)),
|
||||
"manufacturer": cpu.get('Manufacturer'),
|
||||
"serialNumber": serial,
|
||||
"brand": cpu.get('Family'),
|
||||
"address": self.get_cpu_address(cpu),
|
||||
"bogomips": self.get_bogomips(),
|
||||
}
|
||||
)
|
||||
|
||||
def get_ram(self):
|
||||
for ram in self.dmi.get("Memory Device"):
|
||||
if ram.get('size') == 'No Module Installed':
|
||||
continue
|
||||
if not ram.get("Speed"):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "RamModule",
|
||||
"size": self.get_ram_size(ram),
|
||||
"speed": self.get_ram_speed(ram),
|
||||
"manufacturer": ram.get("Manufacturer", self.default),
|
||||
"serialNumber": ram.get("Serial Number", self.default),
|
||||
"interface": ram.get("Type", "DDR"),
|
||||
"format": ram.get("Form Factor", "DIMM"),
|
||||
"model": ram.get("Part Number", self.default),
|
||||
}
|
||||
)
|
||||
|
||||
def get_mother_board(self):
|
||||
for moder_board in self.dmi.get("Baseboard"):
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "Motherboard",
|
||||
"version": moder_board.get("Version"),
|
||||
"serialNumber": moder_board.get("Serial Number", "").strip(),
|
||||
"manufacturer": moder_board.get("Manufacturer", "").strip(),
|
||||
"biosDate": self.get_bios_date(),
|
||||
"ramMaxSize": self.get_max_ram_size(),
|
||||
"ramSlots": len(self.dmi.get("Memory Device")),
|
||||
"slots": self.get_ram_slots(),
|
||||
"model": moder_board.get("Product Name", "").strip(),
|
||||
"firewire": self.get_firmware_num(),
|
||||
"pcmcia": self.get_pcmcia_num(),
|
||||
"serial": self.get_serial_num(),
|
||||
"usb": self.get_usb_num(),
|
||||
}
|
||||
)
|
||||
|
||||
def get_graphic(self):
|
||||
displays = []
|
||||
get_lshw_child(self.lshw, displays, 'display')
|
||||
|
||||
for c in displays:
|
||||
if not c['configuration'].get('driver', None):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "GraphicCard",
|
||||
"memory": "",
|
||||
"manufacturer": c.get("vendor", self.default),
|
||||
"model": c.get("product", self.default),
|
||||
"serialNumber": c.get("serial", self.default),
|
||||
}
|
||||
)
|
||||
|
||||
def get_data_storage(self):
|
||||
for sm in self.smart:
|
||||
if sm.get('smartctl', {}).get('exit_status') == 1:
|
||||
continue
|
||||
model = sm.get('model_name')
|
||||
manufacturer = None
|
||||
hours = sm.get("power_on_time", {}).get("hours", 0)
|
||||
if model and len(model.split(" ")) > 1:
|
||||
mm = model.split(" ")
|
||||
model = mm[-1]
|
||||
manufacturer = " ".join(mm[:-1])
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": self.sanitize(sm),
|
||||
"type": self.get_data_storage_type(sm),
|
||||
"model": model,
|
||||
"manufacturer": manufacturer,
|
||||
"serialNumber": sm.get('serial_number'),
|
||||
"size": self.get_data_storage_size(sm),
|
||||
"variant": sm.get("firmware_version"),
|
||||
"interface": self.get_data_storage_interface(sm),
|
||||
"hours": hours,
|
||||
}
|
||||
)
|
||||
|
||||
def sanitize(self, action):
|
||||
return []
|
||||
|
||||
def get_bogomips(self):
|
||||
if not self.hwinfo:
|
||||
return self.default
|
||||
|
||||
bogomips = 0
|
||||
for row in self.hwinfo:
|
||||
for cel in row:
|
||||
if 'BogoMips' in cel:
|
||||
try:
|
||||
bogomips += float(cel.split(":")[-1])
|
||||
except Exception:
|
||||
pass
|
||||
return bogomips
|
||||
|
||||
def get_networks(self):
|
||||
networks = []
|
||||
get_lshw_child(self.lshw, networks, 'network')
|
||||
|
||||
for c in networks:
|
||||
capacity = c.get('capacity')
|
||||
wireless = bool(c.get('configuration', {}).get('wireless', False))
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "NetworkAdapter",
|
||||
"model": c.get('product'),
|
||||
"manufacturer": c.get('vendor'),
|
||||
"serialNumber": c.get('serial'),
|
||||
"speed": capacity,
|
||||
"variant": c.get('version', 1),
|
||||
"wireless": wireless or False,
|
||||
"integrated": "PCI:0000:00" in c.get("businfo", ""),
|
||||
}
|
||||
)
|
||||
|
||||
def get_sound_card(self):
|
||||
multimedias = []
|
||||
get_lshw_child(self.lshw, multimedias, 'multimedia')
|
||||
|
||||
for c in multimedias:
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "SoundCard",
|
||||
"model": c.get('product'),
|
||||
"manufacturer": c.get('vendor'),
|
||||
"serialNumber": c.get('serial'),
|
||||
}
|
||||
)
|
||||
|
||||
def get_display(self): # noqa: C901
|
||||
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED'
|
||||
|
||||
for c in self.monitors:
|
||||
resolution_width, resolution_height = (None,) * 2
|
||||
refresh, serial, model, manufacturer, size = (None,) * 5
|
||||
year, week, production_date = (None,) * 3
|
||||
|
||||
for x in c:
|
||||
if "Vendor: " in x:
|
||||
manufacturer = x.split('Vendor: ')[-1].strip()
|
||||
if "Model: " in x:
|
||||
model = x.split('Model: ')[-1].strip()
|
||||
if "Serial ID: " in x:
|
||||
serial = x.split('Serial ID: ')[-1].strip()
|
||||
if " Resolution: " in x:
|
||||
rs = x.split(' Resolution: ')[-1].strip()
|
||||
if 'x' in rs:
|
||||
resolution_width, resolution_height = [
|
||||
int(r) for r in rs.split('x')
|
||||
]
|
||||
if "Frequencies: " in x:
|
||||
try:
|
||||
refresh = int(float(x.split(',')[-1].strip()[:-3]))
|
||||
except Exception:
|
||||
pass
|
||||
if 'Year of Manufacture' in x:
|
||||
year = x.split(': ')[1]
|
||||
|
||||
if 'Week of Manufacture' in x:
|
||||
week = x.split(': ')[1]
|
||||
|
||||
if "Size: " in x:
|
||||
size = self.get_size_monitor(x)
|
||||
technology = next((t for t in TECHS if t in c[0]), None)
|
||||
|
||||
if year and week:
|
||||
d = '{} {} 0'.format(year, week)
|
||||
production_date = datetime.strptime(d, '%Y %W %w').isoformat()
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "Display",
|
||||
"model": model,
|
||||
"manufacturer": manufacturer,
|
||||
"serialNumber": serial,
|
||||
'size': size,
|
||||
'resolutionWidth': resolution_width,
|
||||
'resolutionHeight': resolution_height,
|
||||
"productionDate": production_date,
|
||||
'technology': technology,
|
||||
'refreshRate': refresh,
|
||||
}
|
||||
)
|
||||
|
||||
def get_hwinfo_monitors(self):
|
||||
for c in self.hwinfo:
|
||||
monitor = None
|
||||
external = None
|
||||
for x in c:
|
||||
if 'Hardware Class: monitor' in x:
|
||||
monitor = c
|
||||
if 'Driver Info' in x:
|
||||
external = c
|
||||
|
||||
if monitor and not external:
|
||||
self.monitors.append(c)
|
||||
|
||||
def get_size_monitor(self, x):
|
||||
i = 1 / 25.4
|
||||
t = x.split('Size: ')[-1].strip()
|
||||
tt = t.split('mm')
|
||||
if not tt:
|
||||
return 0
|
||||
sizes = tt[0].strip()
|
||||
if 'x' not in sizes:
|
||||
return 0
|
||||
w, h = [int(x) for x in sizes.split('x')]
|
||||
return "{:.2f}".format(np.sqrt(w**2 + h**2) * i)
|
||||
|
||||
def get_cpu_address(self, cpu):
|
||||
default = 64
|
||||
|
||||
try:
|
||||
for ch in self.lshw.get('children', []):
|
||||
for c in ch.get('children', []):
|
||||
if c['class'] == 'processor':
|
||||
return c.get('width', default)
|
||||
except:
|
||||
return default
|
||||
return default
|
||||
|
||||
def get_usb_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "USB" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_serial_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "SERIAL" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_firmware_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "FIRMWARE" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_pcmcia_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "PCMCIA" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_bios_date(self):
|
||||
return self.dmi.get("BIOS")[0].get("Release Date", self.default)
|
||||
|
||||
def get_firmware(self):
|
||||
return self.dmi.get("BIOS")[0].get("Firmware Revision", '1')
|
||||
|
||||
def get_max_ram_size(self):
|
||||
size = 0
|
||||
for slot in self.dmi.get("Physical Memory Array"):
|
||||
capacity = slot.get("Maximum Capacity", '0').split(" ")[0]
|
||||
size += int(capacity)
|
||||
|
||||
return size
|
||||
|
||||
def get_ram_slots(self):
|
||||
slots = 0
|
||||
for x in self.dmi.get("Physical Memory Array"):
|
||||
slots += int(x.get("Number Of Devices", 0))
|
||||
return slots
|
||||
|
||||
def get_ram_size(self, ram):
|
||||
memory = ram.get("Size", "0")
|
||||
return memory
|
||||
|
||||
def get_ram_speed(self, ram):
|
||||
size = ram.get("Speed", "0")
|
||||
return size
|
||||
|
||||
def get_cpu_speed(self, cpu):
|
||||
speed = cpu.get('Max Speed', "0")
|
||||
return speed
|
||||
|
||||
def get_sku(self):
|
||||
return self.dmi.get("System")[0].get("SKU Number", self.default).strip()
|
||||
|
||||
def get_version(self):
|
||||
return self.dmi.get("System")[0].get("Version", self.default).strip()
|
||||
|
||||
def get_uuid(self):
|
||||
return self.dmi.get("System")[0].get("UUID", '').strip()
|
||||
|
||||
def get_family(self):
|
||||
return self.dmi.get("System")[0].get("Family", '')
|
||||
|
||||
def get_chassis(self):
|
||||
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
|
||||
|
||||
def get_type(self):
|
||||
chassis_type = self.get_chassis()
|
||||
return self.translation_to_devicehub(chassis_type)
|
||||
|
||||
def translation_to_devicehub(self, original_type):
|
||||
lower_type = original_type.lower()
|
||||
CHASSIS_TYPE = {
|
||||
'Desktop': [
|
||||
'desktop',
|
||||
'low-profile',
|
||||
'tower',
|
||||
'docking',
|
||||
'all-in-one',
|
||||
'pizzabox',
|
||||
'mini-tower',
|
||||
'space-saving',
|
||||
'lunchbox',
|
||||
'mini',
|
||||
'stick',
|
||||
],
|
||||
'Laptop': [
|
||||
'portable',
|
||||
'laptop',
|
||||
'convertible',
|
||||
'tablet',
|
||||
'detachable',
|
||||
'notebook',
|
||||
'handheld',
|
||||
'sub-notebook',
|
||||
],
|
||||
'Server': ['server'],
|
||||
'Computer': ['_virtual'],
|
||||
}
|
||||
for k, v in CHASSIS_TYPE.items():
|
||||
if lower_type in v:
|
||||
return k
|
||||
return self.default
|
||||
|
||||
def get_chassis_dh(self):
|
||||
chassis = self.get_chassis()
|
||||
lower_type = chassis.lower()
|
||||
for k, v in CHASSIS_DH.items():
|
||||
if lower_type in v:
|
||||
return k
|
||||
return self.default
|
||||
|
||||
def get_data_storage_type(self, x):
|
||||
# TODO @cayop add more SSDS types
|
||||
SSDS = ["nvme"]
|
||||
SSD = 'SolidStateDrive'
|
||||
HDD = 'HardDrive'
|
||||
type_dev = x.get('device', {}).get('type')
|
||||
trim = x.get('trim', {}).get("supported") in [True, "true"]
|
||||
return SSD if type_dev in SSDS or trim else HDD
|
||||
|
||||
def get_data_storage_interface(self, x):
|
||||
interface = x.get('device', {}).get('protocol', 'ATA')
|
||||
if interface.upper() in DATASTORAGEINTERFACE:
|
||||
return interface.upper()
|
||||
|
||||
txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
|
||||
self.sid, interface
|
||||
)
|
||||
self.errors("{}".format(txt))
|
||||
|
||||
def get_data_storage_size(self, x):
|
||||
return x.get('user_capacity', {}).get('bytes')
|
||||
|
||||
def parse_hwinfo(self):
|
||||
hw_blocks = self.hwinfo_raw.split("\n\n")
|
||||
return [x.split("\n") for x in hw_blocks]
|
||||
|
||||
def loads(self, x):
|
||||
if isinstance(x, str):
|
||||
try:
|
||||
try:
|
||||
hw = json.loads(x)
|
||||
except json.decoder.JSONDecodeError:
|
||||
hw = json.loads(repair_json(x))
|
||||
return hw
|
||||
except Exception as ss:
|
||||
logger.warning("%s", ss)
|
||||
return {}
|
||||
return x
|
||||
|
||||
def errors(self, txt=None):
|
||||
if not txt:
|
||||
return self._errors
|
||||
|
||||
logger.error(txt)
|
||||
self._errors.append("%s", txt)
|
58
evidence/mixin_parse.py
Normal file
58
evidence/mixin_parse.py
Normal file
|
@ -0,0 +1,58 @@
|
|||
import logging
|
||||
from django.conf import settings
|
||||
|
||||
from utils.constants import ALGOS
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
class BuildMix:
|
||||
def __init__(self, evidence_json):
|
||||
self.json = evidence_json
|
||||
self.uuid = self.json.get('uuid')
|
||||
self.manufacturer = ""
|
||||
self.model = ""
|
||||
self.serial_number = ""
|
||||
self.chassis = ""
|
||||
self.sku = ""
|
||||
self.mac = ""
|
||||
self.tpy = ""
|
||||
self.version = ""
|
||||
self.get_details()
|
||||
self.generate_chids()
|
||||
|
||||
def get_hid(self, algo):
|
||||
algorithm = ALGOS.get(algo, [])
|
||||
hid = ""
|
||||
for f in algorithm:
|
||||
if hasattr(self, f):
|
||||
hid += getattr(self, f)
|
||||
return hid
|
||||
|
||||
def generate_chids(self):
|
||||
self.algorithms = {
|
||||
'hidalgo1': self.get_hid('hidalgo1'),
|
||||
}
|
||||
if settings.DPP:
|
||||
self.algorithms["legacy_dpp"] = self.get_hid("legacy_dpp")
|
||||
|
||||
def get_doc(self):
|
||||
self._get_components()
|
||||
for c in self.components:
|
||||
c.pop("actions", None)
|
||||
|
||||
components = sorted(self.components, key=lambda x: x.get("type"))
|
||||
device = self.algorithms.get('legacy_dpp')
|
||||
|
||||
doc = [("computer", device)]
|
||||
|
||||
for c in components:
|
||||
doc.append((c.get("type"), self.get_id_hw_dpp(c)))
|
||||
|
||||
def get_id_hw_dpp(self, d):
|
||||
algorithm = ALGOS.get("legacy_dpp", [])
|
||||
hid = ""
|
||||
for f in algorithm:
|
||||
hid += d.get(f, '')
|
||||
return hid
|
|
@ -1,4 +1,5 @@
|
|||
import json
|
||||
import hashlib
|
||||
|
||||
from dmidecode import DMIParse
|
||||
from django.db import models
|
||||
|
@ -6,6 +7,7 @@ from django.db import models
|
|||
from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH
|
||||
from evidence.xapian import search
|
||||
from evidence.parse_details import ParseSnapshot
|
||||
from evidence.normal_parse_details import get_inxi, get_inxi_key
|
||||
from user.models import User, Institution
|
||||
|
||||
|
||||
|
@ -39,6 +41,7 @@ class Evidence:
|
|||
self.doc = None
|
||||
self.created = None
|
||||
self.dmi = None
|
||||
self.inxi = None
|
||||
self.annotations = []
|
||||
self.components = []
|
||||
self.default = "n/a"
|
||||
|
@ -58,10 +61,19 @@ class Evidence:
|
|||
if a:
|
||||
self.owner = a.owner
|
||||
|
||||
def get_phid(self):
|
||||
if not self.doc:
|
||||
self.get_doc()
|
||||
|
||||
return hashlib.sha3_256(json.dumps(self.doc)).hexdigest()
|
||||
|
||||
def get_doc(self):
|
||||
self.doc = {}
|
||||
self.inxi = None
|
||||
|
||||
if not self.owner:
|
||||
self.get_owner()
|
||||
|
||||
qry = 'uuid:"{}"'.format(self.uuid)
|
||||
matches = search(self.owner, qry, limit=1)
|
||||
if matches and matches.size() < 0:
|
||||
|
@ -70,9 +82,36 @@ class Evidence:
|
|||
for xa in matches:
|
||||
self.doc = json.loads(xa.document.get_data())
|
||||
|
||||
if not self.is_legacy():
|
||||
if self.is_legacy():
|
||||
return
|
||||
|
||||
if self.doc.get("credentialSubject"):
|
||||
for ev in self.doc["evidence"]:
|
||||
if "dmidecode" == ev.get("operation"):
|
||||
dmidecode_raw = ev["output"]
|
||||
if "inxi" == ev.get("operation"):
|
||||
self.inxi = ev["output"]
|
||||
else:
|
||||
dmidecode_raw = self.doc["data"]["dmidecode"]
|
||||
inxi_raw = self.doc.get("data", {}).get("inxi")
|
||||
self.dmi = DMIParse(dmidecode_raw)
|
||||
try:
|
||||
self.inxi = json.loads(inxi_raw)
|
||||
except Exception:
|
||||
pass
|
||||
if self.inxi:
|
||||
try:
|
||||
machine = get_inxi_key(self.inxi, 'Machine')
|
||||
for m in machine:
|
||||
system = get_inxi(m, "System")
|
||||
if system:
|
||||
self.device_manufacturer = system
|
||||
self.device_model = get_inxi(m, "product")
|
||||
self.device_serial_number = get_inxi(m, "serial")
|
||||
self.device_chassis = get_inxi(m, "Type")
|
||||
self.device_version = get_inxi(m, "v")
|
||||
except Exception:
|
||||
return
|
||||
|
||||
def get_time(self):
|
||||
if not self.doc:
|
||||
|
@ -96,7 +135,10 @@ class Evidence:
|
|||
return list(self.doc.get('kv').values())[0]
|
||||
|
||||
if self.is_legacy():
|
||||
return self.doc['device']['manufacturer']
|
||||
return self.doc.get('device', {}).get('manufacturer', '')
|
||||
|
||||
if self.inxi:
|
||||
return self.device_manufacturer
|
||||
|
||||
return self.dmi.manufacturer().strip()
|
||||
|
||||
|
@ -108,13 +150,19 @@ class Evidence:
|
|||
return list(self.doc.get('kv').values())[1]
|
||||
|
||||
if self.is_legacy():
|
||||
return self.doc['device']['model']
|
||||
return self.doc.get('device', {}).get('model', '')
|
||||
|
||||
if self.inxi:
|
||||
return self.device_model
|
||||
|
||||
return self.dmi.model().strip()
|
||||
|
||||
def get_chassis(self):
|
||||
if self.is_legacy():
|
||||
return self.doc['device']['model']
|
||||
return self.doc.get('device', {}).get('model', '')
|
||||
|
||||
if self.inxi:
|
||||
return self.device_chassis
|
||||
|
||||
chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual')
|
||||
lower_type = chassis.lower()
|
||||
|
@ -126,9 +174,19 @@ class Evidence:
|
|||
|
||||
def get_serial_number(self):
|
||||
if self.is_legacy():
|
||||
return self.doc['device']['serialNumber']
|
||||
return self.doc.get('device', {}).get('serialNumber', '')
|
||||
|
||||
if self.inxi:
|
||||
return self.device_serial_number
|
||||
|
||||
return self.dmi.serial_number().strip()
|
||||
|
||||
def get_version(self):
|
||||
if self.inxi:
|
||||
return self.device_version
|
||||
|
||||
return ""
|
||||
|
||||
@classmethod
|
||||
def get_all(cls, user):
|
||||
return Annotation.objects.filter(
|
||||
|
@ -138,10 +196,12 @@ class Evidence:
|
|||
).order_by("-created").values_list("uuid", "created").distinct()
|
||||
|
||||
def set_components(self):
|
||||
snapshot = ParseSnapshot(self.doc).snapshot_json
|
||||
self.components = snapshot['components']
|
||||
self.components = ParseSnapshot(self.doc).components
|
||||
|
||||
def is_legacy(self):
|
||||
if self.doc.get("credentialSubject"):
|
||||
return False
|
||||
|
||||
return self.doc.get("software") != "workbench-script"
|
||||
|
||||
def is_web_snapshot(self):
|
||||
|
|
64
evidence/normal_parse.py
Normal file
64
evidence/normal_parse.py
Normal file
|
@ -0,0 +1,64 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from evidence.mixin_parse import BuildMix
|
||||
from evidence.normal_parse_details import get_inxi_key, get_inxi, ParseSnapshot
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
def get_mac(inxi):
|
||||
nets = get_inxi_key(inxi, "Network")
|
||||
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
|
||||
|
||||
for n, iface in networks:
|
||||
if get_inxi(n, "port"):
|
||||
return get_inxi(iface, 'mac')
|
||||
|
||||
|
||||
class Build(BuildMix):
|
||||
|
||||
def get_details(self):
|
||||
self.from_credential()
|
||||
try:
|
||||
self.inxi = self.json["data"]["inxi"]
|
||||
if isinstance(self.inxi, str):
|
||||
self.inxi = json.loads(self.inxi)
|
||||
except Exception:
|
||||
logger.error("No inxi in snapshot %s", self.uuid)
|
||||
return ""
|
||||
|
||||
machine = get_inxi_key(self.inxi, 'Machine')
|
||||
for m in machine:
|
||||
system = get_inxi(m, "System")
|
||||
if system:
|
||||
self.manufacturer = system
|
||||
self.model = get_inxi(m, "product")
|
||||
self.serial_number = get_inxi(m, "serial")
|
||||
self.chassis = get_inxi(m, "Type")
|
||||
else:
|
||||
self.sku = get_inxi(m, "part-nu")
|
||||
|
||||
self.mac = get_mac(self.inxi) or ""
|
||||
if not self.mac:
|
||||
txt = "Could not retrieve MAC address in snapshot %s"
|
||||
logger.warning(txt, self.uuid)
|
||||
|
||||
def from_credential(self):
|
||||
if not self.json.get("credentialSubject"):
|
||||
return
|
||||
|
||||
self.uuid = self.json.get("credentialSubject", {}).get("uuid")
|
||||
self.json.update(self.json["credentialSubject"])
|
||||
if self.json.get("evidence"):
|
||||
self.json["data"] = {}
|
||||
for ev in self.json["evidence"]:
|
||||
k = ev.get("operation")
|
||||
if not k:
|
||||
continue
|
||||
self.json["data"][k] = ev.get("output")
|
||||
|
||||
def _get_components(self):
|
||||
data = ParseSnapshot(self.json)
|
||||
self.components = data.components
|
402
evidence/normal_parse_details.py
Normal file
402
evidence/normal_parse_details.py
Normal file
|
@ -0,0 +1,402 @@
|
|||
import json
|
||||
import logging
|
||||
|
||||
from dmidecode import DMIParse
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
def get_inxi_key(inxi, component):
|
||||
for n in inxi:
|
||||
for k, v in n.items():
|
||||
if component in k:
|
||||
return v
|
||||
|
||||
|
||||
def get_inxi(n, name):
|
||||
for k, v in n.items():
|
||||
if f"#{name}" in k:
|
||||
return v
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
class ParseSnapshot:
|
||||
def __init__(self, snapshot, default="n/a"):
|
||||
self.default = default
|
||||
self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}")
|
||||
self.smart_raw = snapshot.get("data", {}).get("smartctl", [])
|
||||
self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or ""
|
||||
for ev in snapshot.get("evidence", []):
|
||||
if "dmidecode" == ev.get("operation"):
|
||||
self.dmidecode_raw = ev["output"]
|
||||
if "inxi" == ev.get("operation"):
|
||||
self.inxi_raw = ev["output"]
|
||||
if "smartctl" == ev.get("operation"):
|
||||
self.smart_raw = ev["output"]
|
||||
data = snapshot
|
||||
if snapshot.get("credentialSubject"):
|
||||
data = snapshot["credentialSubject"]
|
||||
|
||||
self.device = {"actions": []}
|
||||
self.components = []
|
||||
|
||||
self.dmi = DMIParse(self.dmidecode_raw)
|
||||
self.smart = self.loads(self.smart_raw)
|
||||
self.inxi = self.loads(self.inxi_raw)
|
||||
|
||||
self.set_computer()
|
||||
self.set_components()
|
||||
self.snapshot_json = {
|
||||
"type": "Snapshot",
|
||||
"device": self.device,
|
||||
"software": data["software"],
|
||||
"components": self.components,
|
||||
"uuid": data['uuid'],
|
||||
"endTime": data["timestamp"],
|
||||
"elapsed": 1,
|
||||
}
|
||||
|
||||
def set_computer(self):
|
||||
machine = get_inxi_key(self.inxi, 'Machine') or []
|
||||
for m in machine:
|
||||
system = get_inxi(m, "System")
|
||||
if system:
|
||||
self.device['manufacturer'] = system
|
||||
self.device['model'] = get_inxi(m, "product")
|
||||
self.device['serialNumber'] = get_inxi(m, "serial")
|
||||
self.device['type'] = get_inxi(m, "Type")
|
||||
self.device['chassis'] = self.device['type']
|
||||
self.device['version'] = get_inxi(m, "v")
|
||||
else:
|
||||
self.device['system_uuid'] = get_inxi(m, "uuid")
|
||||
self.device['sku'] = get_inxi(m, "part-nu")
|
||||
|
||||
def set_components(self):
|
||||
self.get_mother_board()
|
||||
self.get_cpu()
|
||||
self.get_ram()
|
||||
self.get_graphic()
|
||||
self.get_display()
|
||||
self.get_networks()
|
||||
self.get_sound_card()
|
||||
self.get_data_storage()
|
||||
self.get_battery()
|
||||
|
||||
def get_mother_board(self):
|
||||
machine = get_inxi_key(self.inxi, 'Machine') or []
|
||||
mb = {"type": "Motherboard",}
|
||||
for m in machine:
|
||||
bios_date = get_inxi(m, "date")
|
||||
if not bios_date:
|
||||
continue
|
||||
mb["manufacturer"] = get_inxi(m, "Mobo")
|
||||
mb["model"] = get_inxi(m, "model")
|
||||
mb["serialNumber"] = get_inxi(m, "serial")
|
||||
mb["version"] = get_inxi(m, "v")
|
||||
mb["biosDate"] = bios_date
|
||||
mb["biosVersion"] = self.get_bios_version()
|
||||
mb["firewire"]: self.get_firmware_num()
|
||||
mb["pcmcia"]: self.get_pcmcia_num()
|
||||
mb["serial"]: self.get_serial_num()
|
||||
mb["usb"]: self.get_usb_num()
|
||||
|
||||
self.get_ram_slots(mb)
|
||||
|
||||
self.components.append(mb)
|
||||
|
||||
def get_ram_slots(self, mb):
|
||||
memory = get_inxi_key(self.inxi, 'Memory') or []
|
||||
for m in memory:
|
||||
slots = get_inxi(m, "slots")
|
||||
if not slots:
|
||||
continue
|
||||
mb["slots"] = slots
|
||||
mb["ramSlots"] = get_inxi(m, "modules")
|
||||
mb["ramMaxSize"] = get_inxi(m, "capacity")
|
||||
|
||||
|
||||
def get_cpu(self):
|
||||
cpu = get_inxi_key(self.inxi, 'CPU') or []
|
||||
cp = {"type": "Processor"}
|
||||
vulnerabilities = []
|
||||
for c in cpu:
|
||||
base = get_inxi(c, "model")
|
||||
if base:
|
||||
cp["model"] = get_inxi(c, "model")
|
||||
cp["arch"] = get_inxi(c, "arch")
|
||||
cp["bits"] = get_inxi(c, "bits")
|
||||
cp["gen"] = get_inxi(c, "gen")
|
||||
cp["family"] = get_inxi(c, "family")
|
||||
cp["date"] = get_inxi(c, "built")
|
||||
continue
|
||||
des = get_inxi(c, "L1")
|
||||
if des:
|
||||
cp["L1"] = des
|
||||
cp["L2"] = get_inxi(c, "L2")
|
||||
cp["L3"] = get_inxi(c, "L3")
|
||||
cp["cpus"] = get_inxi(c, "cpus")
|
||||
cp["cores"] = get_inxi(c, "cores")
|
||||
cp["threads"] = get_inxi(c, "threads")
|
||||
continue
|
||||
bogo = get_inxi(c, "bogomips")
|
||||
if bogo:
|
||||
cp["bogomips"] = bogo
|
||||
cp["base/boost"] = get_inxi(c, "base/boost")
|
||||
cp["min/max"] = get_inxi(c, "min/max")
|
||||
cp["ext-clock"] = get_inxi(c, "ext-clock")
|
||||
cp["volts"] = get_inxi(c, "volts")
|
||||
continue
|
||||
ctype = get_inxi(c, "Type")
|
||||
if ctype:
|
||||
v = {"Type": ctype}
|
||||
status = get_inxi(c, "status")
|
||||
if status:
|
||||
v["status"] = status
|
||||
mitigation = get_inxi(c, "mitigation")
|
||||
if mitigation:
|
||||
v["mitigation"] = mitigation
|
||||
vulnerabilities.append(v)
|
||||
|
||||
self.components.append(cp)
|
||||
|
||||
|
||||
def get_ram(self):
|
||||
memory = get_inxi_key(self.inxi, 'Memory') or []
|
||||
mem = {"type": "RamModule"}
|
||||
|
||||
for m in memory:
|
||||
base = get_inxi(m, "System RAM")
|
||||
if base:
|
||||
mem["size"] = get_inxi(m, "total")
|
||||
slot = get_inxi(m, "manufacturer")
|
||||
if slot:
|
||||
mem["manufacturer"] = slot
|
||||
mem["model"] = get_inxi(m, "part-no")
|
||||
mem["serialNumber"] = get_inxi(m, "serial")
|
||||
mem["speed"] = get_inxi(m, "speed")
|
||||
mem["bits"] = get_inxi(m, "data")
|
||||
mem["interface"] = get_inxi(m, "type")
|
||||
module = get_inxi(m, "modules")
|
||||
if module:
|
||||
mem["modules"] = module
|
||||
|
||||
self.components.append(mem)
|
||||
|
||||
def get_graphic(self):
|
||||
graphics = get_inxi_key(self.inxi, 'Graphics') or []
|
||||
|
||||
for c in graphics:
|
||||
if not get_inxi(c, "Device") or not get_inxi(c, "vendor"):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "GraphicCard",
|
||||
"memory": self.get_memory_video(c),
|
||||
"manufacturer": get_inxi(c, "vendor"),
|
||||
"model": get_inxi(c, "Device"),
|
||||
"arch": get_inxi(c, "arch"),
|
||||
"serialNumber": get_inxi(c, "serial"),
|
||||
"integrated": True if get_inxi(c, "port") else False
|
||||
}
|
||||
)
|
||||
|
||||
def get_battery(self):
|
||||
bats = get_inxi_key(self.inxi, 'Battery') or []
|
||||
for b in bats:
|
||||
self.components.append(
|
||||
{
|
||||
"type": "Battery",
|
||||
"model": get_inxi(b, "model"),
|
||||
"serialNumber": get_inxi(b, "serial"),
|
||||
"condition": get_inxi(b, "condition"),
|
||||
"cycles": get_inxi(b, "cycles"),
|
||||
"volts": get_inxi(b, "volts")
|
||||
}
|
||||
)
|
||||
|
||||
def get_memory_video(self, c):
|
||||
memory = get_inxi_key(self.inxi, 'Memory') or []
|
||||
|
||||
for m in memory:
|
||||
igpu = get_inxi(m, "igpu")
|
||||
agpu = get_inxi(m, "agpu")
|
||||
ngpu = get_inxi(m, "ngpu")
|
||||
gpu = get_inxi(m, "gpu")
|
||||
if igpu or agpu or gpu or ngpu:
|
||||
return igpu or agpu or gpu or ngpu
|
||||
|
||||
return self.default
|
||||
|
||||
def get_data_storage(self):
|
||||
hdds= get_inxi_key(self.inxi, 'Drives') or []
|
||||
for d in hdds:
|
||||
usb = get_inxi(d, "type")
|
||||
if usb == "USB":
|
||||
continue
|
||||
|
||||
serial = get_inxi(d, "serial")
|
||||
if serial:
|
||||
hd = {
|
||||
"type": "Storage",
|
||||
"manufacturer": get_inxi(d, "vendor"),
|
||||
"model": get_inxi(d, "model"),
|
||||
"serialNumber": get_inxi(d, "serial"),
|
||||
"size": get_inxi(d, "size"),
|
||||
"speed": get_inxi(d, "speed"),
|
||||
"interface": get_inxi(d, "tech"),
|
||||
"firmware": get_inxi(d, "fw-rev")
|
||||
}
|
||||
rpm = get_inxi(d, "rpm")
|
||||
if rpm:
|
||||
hd["rpm"] = rpm
|
||||
|
||||
family = get_inxi(d, "family")
|
||||
if family:
|
||||
hd["family"] = family
|
||||
|
||||
sata = get_inxi(d, "sata")
|
||||
if sata:
|
||||
hd["sata"] = sata
|
||||
|
||||
continue
|
||||
|
||||
|
||||
cycles = get_inxi(d, "cycles")
|
||||
if cycles:
|
||||
hd['cycles'] = cycles
|
||||
hd["health"] = get_inxi(d, "health")
|
||||
hd["time of used"] = get_inxi(d, "on")
|
||||
hd["read used"] = get_inxi(d, "read-units")
|
||||
hd["written used"] = get_inxi(d, "written-units")
|
||||
|
||||
self.components.append(hd)
|
||||
continue
|
||||
|
||||
hd = {}
|
||||
|
||||
def sanitize(self, action):
|
||||
return []
|
||||
|
||||
def get_networks(self):
|
||||
nets = get_inxi_key(self.inxi, "Network") or []
|
||||
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
|
||||
|
||||
for n, iface in networks:
|
||||
model = get_inxi(n, "Device")
|
||||
if not model:
|
||||
continue
|
||||
|
||||
interface = ''
|
||||
for k in n.keys():
|
||||
if "port" in k:
|
||||
interface = "Integrated"
|
||||
if "pcie" in k:
|
||||
interface = "PciExpress"
|
||||
if get_inxi(n, "type") == "USB":
|
||||
interface = "USB"
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "NetworkAdapter",
|
||||
"model": model,
|
||||
"manufacturer": get_inxi(n, 'vendor'),
|
||||
"serialNumber": get_inxi(iface, 'mac'),
|
||||
"speed": get_inxi(n, "speed"),
|
||||
"interface": interface,
|
||||
}
|
||||
)
|
||||
|
||||
def get_sound_card(self):
|
||||
audio = get_inxi_key(self.inxi, "Audio") or []
|
||||
|
||||
for c in audio:
|
||||
model = get_inxi(c, "Device")
|
||||
if not model:
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "SoundCard",
|
||||
"model": model,
|
||||
"manufacturer": get_inxi(c, 'vendor'),
|
||||
"serialNumber": get_inxi(c, 'serial'),
|
||||
}
|
||||
)
|
||||
|
||||
def get_display(self):
|
||||
graphics = get_inxi_key(self.inxi, "Graphics") or []
|
||||
for c in graphics:
|
||||
if not get_inxi(c, "Monitor"):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"type": "Display",
|
||||
"model": get_inxi(c, "model"),
|
||||
"manufacturer": get_inxi(c, "vendor"),
|
||||
"serialNumber": get_inxi(c, "serial"),
|
||||
'size': get_inxi(c, "size"),
|
||||
'diagonal': get_inxi(c, "diag"),
|
||||
'resolution': get_inxi(c, "res"),
|
||||
"date": get_inxi(c, "built"),
|
||||
'ratio': get_inxi(c, "ratio"),
|
||||
}
|
||||
)
|
||||
|
||||
def get_usb_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "USB" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_serial_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "SERIAL" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_firmware_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "FIRMWARE" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_pcmcia_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "PCMCIA" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_bios_version(self):
|
||||
return self.dmi.get("BIOS")[0].get("BIOS Revision", '1')
|
||||
|
||||
def loads(self, x):
|
||||
if isinstance(x, str):
|
||||
try:
|
||||
return json.loads(x)
|
||||
except Exception as ss:
|
||||
logger.warning("%s", ss)
|
||||
return {}
|
||||
return x
|
||||
|
||||
def errors(self, txt=None):
|
||||
if not txt:
|
||||
return self._errors
|
||||
|
||||
logger.error(txt)
|
||||
self._errors.append("%s", txt)
|
22
evidence/old_parse.py
Normal file
22
evidence/old_parse.py
Normal file
|
@ -0,0 +1,22 @@
|
|||
import logging
|
||||
|
||||
from evidence.mixin_parse import BuildMix
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
class Build(BuildMix):
|
||||
# This parse is for get info from snapshots created with old workbench
|
||||
# normaly is worbench 11
|
||||
|
||||
def get_details(self):
|
||||
device = self.json.get('device', {})
|
||||
self.manufacturer = device.get("manufacturer", '')
|
||||
self.model = device.get("model", '')
|
||||
self.chassis = device.get("chassis", '')
|
||||
self.serial_number = device.get("serialNumber", '')
|
||||
self.sku = device.get("sku", '')
|
||||
|
||||
def _get_components(self):
|
||||
self.components = self.json.get("components", [])
|
13
evidence/old_parse_details.py
Normal file
13
evidence/old_parse_details.py
Normal file
|
@ -0,0 +1,13 @@
|
|||
import logging
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
class ParseSnapshot:
|
||||
def __init__(self, snapshot, default="n/a"):
|
||||
self.default = default
|
||||
self.snapshot_json = snapshot
|
||||
|
||||
self.device = snapshot.get("device")
|
||||
self.components = snapshot.get("components")
|
|
@ -2,44 +2,125 @@ import json
|
|||
import hashlib
|
||||
import logging
|
||||
|
||||
from dmidecode import DMIParse
|
||||
from json_repair import repair_json
|
||||
from evidence.parse_details import get_lshw_child
|
||||
from evidence import legacy_parse
|
||||
from evidence import old_parse
|
||||
from evidence import normal_parse
|
||||
from evidence.parse_details import ParseSnapshot
|
||||
|
||||
from evidence.models import Annotation
|
||||
from evidence.xapian import index
|
||||
from utils.constants import CHASSIS_DH
|
||||
from evidence.normal_parse_details import get_inxi_key, get_inxi
|
||||
from django.conf import settings
|
||||
|
||||
if settings.DPP:
|
||||
from dpp.api_dlt import register_device_dlt, register_passport_dlt
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
def get_mac(lshw):
|
||||
try:
|
||||
if type(lshw) is dict:
|
||||
hw = lshw
|
||||
else:
|
||||
hw = json.loads(lshw)
|
||||
except json.decoder.JSONDecodeError:
|
||||
hw = json.loads(repair_json(lshw))
|
||||
|
||||
nets = []
|
||||
get_lshw_child(hw, nets, 'network')
|
||||
|
||||
nets_sorted = sorted(nets, key=lambda x: x['businfo'])
|
||||
|
||||
if nets_sorted:
|
||||
mac = nets_sorted[0]['serial']
|
||||
logger.debug("The snapshot has the following MAC: %s" , mac)
|
||||
return mac
|
||||
|
||||
def get_mac(inxi):
|
||||
nets = get_inxi_key(inxi, "Network")
|
||||
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
|
||||
|
||||
for n, iface in networks:
|
||||
if get_inxi(n, "port"):
|
||||
return get_inxi(iface, 'mac')
|
||||
|
||||
class Build:
|
||||
def __init__(self, evidence_json, user, check=False):
|
||||
self.json = evidence_json
|
||||
"""
|
||||
This Build do the save in xapian as document, in Annotations and do
|
||||
register in dlt if is configured for that.
|
||||
|
||||
We have 4 cases for parser diferents snapshots than come from workbench.
|
||||
1) worbench 11 is old_parse.
|
||||
2) legacy is the worbench-script when create a snapshot for devicehub-teal
|
||||
3) some snapshots come as a credential. In this case is parsed as normal_parse
|
||||
4) normal snapshot from worbench-script is the most basic and is parsed as normal_parse
|
||||
"""
|
||||
self.evidence = evidence_json.copy()
|
||||
self.uuid = self.evidence.get('uuid')
|
||||
self.user = user
|
||||
|
||||
if evidence_json.get("credentialSubject"):
|
||||
self.build = normal_parse.Build(evidence_json)
|
||||
self.uuid = evidence_json.get("credentialSubject", {}).get("uuid")
|
||||
elif evidence_json.get("software") != "workbench-script":
|
||||
self.build = old_parse.Build(evidence_json)
|
||||
elif evidence_json.get("data",{}).get("lshw"):
|
||||
self.build = legacy_parse.Build(evidence_json)
|
||||
else:
|
||||
self.build = normal_parse.Build(evidence_json)
|
||||
|
||||
if check:
|
||||
return
|
||||
|
||||
self.index()
|
||||
self.create_annotations()
|
||||
if settings.DPP:
|
||||
self.register_device_dlt()
|
||||
|
||||
def index(self):
|
||||
snap = json.dumps(self.evidence)
|
||||
index(self.user.institution, self.uuid, snap)
|
||||
|
||||
def create_annotations(self):
|
||||
annotation = Annotation.objects.filter(
|
||||
uuid=self.uuid,
|
||||
owner=self.user.institution,
|
||||
type=Annotation.Type.SYSTEM,
|
||||
)
|
||||
|
||||
if annotation:
|
||||
txt = "Warning: Snapshot %s already registered (annotation exists)"
|
||||
logger.warning(txt, self.uuid)
|
||||
return
|
||||
|
||||
for k, v in self.build.algorithms.items():
|
||||
Annotation.objects.create(
|
||||
uuid=self.uuid,
|
||||
owner=self.user.institution,
|
||||
user=self.user,
|
||||
type=Annotation.Type.SYSTEM,
|
||||
key=k,
|
||||
value=self.sign(v)
|
||||
)
|
||||
|
||||
def sign(self, doc):
|
||||
return hashlib.sha3_256(doc.encode()).hexdigest()
|
||||
|
||||
def register_device_dlt(self):
|
||||
legacy_dpp = self.build.algorithms.get('legacy_dpp')
|
||||
chid = self.sign(legacy_dpp)
|
||||
phid = self.sign(json.dumps(self.build.get_doc()))
|
||||
register_device_dlt(chid, phid, self.uuid, self.user)
|
||||
register_passport_dlt(chid, phid, self.uuid, self.user)
|
||||
|
||||
|
||||
class Build2:
|
||||
def __init__(self, evidence_json, user, check=False):
|
||||
if evidence_json.get("data",{}).get("lshw"):
|
||||
if evidence_json.get("software") == "workbench-script":
|
||||
return legacy_parse.Build(evidence_json, user, check=check)
|
||||
|
||||
self.evidence = evidence_json.copy()
|
||||
self.json = evidence_json.copy()
|
||||
|
||||
if evidence_json.get("credentialSubject"):
|
||||
self.json.update(evidence_json["credentialSubject"])
|
||||
if evidence_json.get("evidence"):
|
||||
self.json["data"] = {}
|
||||
for ev in evidence_json["evidence"]:
|
||||
k = ev.get("operation")
|
||||
if not k:
|
||||
continue
|
||||
self.json["data"][k] = ev.get("output")
|
||||
|
||||
self.uuid = self.json['uuid']
|
||||
self.user = user
|
||||
self.hid = None
|
||||
self.chid = None
|
||||
self.phid = self.get_signature(self.json)
|
||||
self.generate_chids()
|
||||
|
||||
if check:
|
||||
|
@ -47,14 +128,17 @@ class Build:
|
|||
|
||||
self.index()
|
||||
self.create_annotations()
|
||||
if settings.DPP:
|
||||
self.register_device_dlt()
|
||||
|
||||
def index(self):
|
||||
snap = json.dumps(self.json)
|
||||
snap = json.dumps(self.evidence)
|
||||
index(self.user.institution, self.uuid, snap)
|
||||
|
||||
def generate_chids(self):
|
||||
self.algorithms = {
|
||||
'hidalgo1': self.get_hid_14(),
|
||||
'legacy_dpp': self.get_chid_dpp(),
|
||||
}
|
||||
|
||||
def get_hid_14(self):
|
||||
|
@ -69,8 +153,51 @@ class Build:
|
|||
sku = device.get("sku", '')
|
||||
hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}"
|
||||
|
||||
self.chid = hashlib.sha3_256(hid.encode()).hexdigest()
|
||||
return self.chid
|
||||
|
||||
return hashlib.sha3_256(hid.encode()).hexdigest()
|
||||
def get_chid_dpp(self):
|
||||
if self.json.get("software") == "workbench-script":
|
||||
device = ParseSnapshot(self.json).device
|
||||
else:
|
||||
device = self.json['device']
|
||||
|
||||
hid = self.get_id_hw_dpp(device)
|
||||
self.chid = hashlib.sha3_256(hid.encode("utf-8")).hexdigest()
|
||||
return self.chid
|
||||
|
||||
def get_id_hw_dpp(self, d):
|
||||
manufacturer = d.get("manufacturer", '')
|
||||
model = d.get("model", '')
|
||||
chassis = d.get("chassis", '')
|
||||
serial_number = d.get("serialNumber", '')
|
||||
sku = d.get("sku", '')
|
||||
typ = d.get("type", '')
|
||||
version = d.get("version", '')
|
||||
|
||||
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{typ}{version}"
|
||||
|
||||
def get_phid(self):
|
||||
if self.json.get("software") == "workbench-script":
|
||||
data = ParseSnapshot(self.json)
|
||||
self.device = data.device
|
||||
self.components = data.components
|
||||
else:
|
||||
self.device = self.json.get("device")
|
||||
self.components = self.json.get("components", [])
|
||||
|
||||
self.device.pop("actions", None)
|
||||
for c in self.components:
|
||||
c.pop("actions", None)
|
||||
|
||||
device = self.get_id_hw_dpp(self.device)
|
||||
components = sorted(self.components, key=lambda x: x.get("type"))
|
||||
doc = [("computer", device)]
|
||||
|
||||
for c in components:
|
||||
doc.append((c.get("type"), self.get_id_hw_dpp(c)))
|
||||
|
||||
return doc
|
||||
|
||||
def create_annotations(self):
|
||||
annotation = Annotation.objects.filter(
|
||||
|
@ -94,38 +221,39 @@ class Build:
|
|||
value=v
|
||||
)
|
||||
|
||||
def get_chassis_dh(self):
|
||||
chassis = self.get_chassis()
|
||||
lower_type = chassis.lower()
|
||||
for k, v in CHASSIS_DH.items():
|
||||
if lower_type in v:
|
||||
return k
|
||||
return self.default
|
||||
|
||||
def get_sku(self):
|
||||
return self.dmi.get("System")[0].get("SKU Number", "n/a").strip()
|
||||
|
||||
def get_chassis(self):
|
||||
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
|
||||
|
||||
def get_hid(self, snapshot):
|
||||
dmidecode_raw = snapshot["data"]["dmidecode"]
|
||||
self.dmi = DMIParse(dmidecode_raw)
|
||||
try:
|
||||
self.inxi = self.json["data"]["inxi"]
|
||||
if isinstance(self.inxi, str):
|
||||
self.inxi = json.loads(self.inxi)
|
||||
except Exception:
|
||||
logger.error("No inxi in snapshot %s", self.uuid)
|
||||
return ""
|
||||
|
||||
manufacturer = self.dmi.manufacturer().strip()
|
||||
model = self.dmi.model().strip()
|
||||
chassis = self.get_chassis_dh()
|
||||
serial_number = self.dmi.serial_number()
|
||||
sku = self.get_sku()
|
||||
machine = get_inxi_key(self.inxi, 'Machine')
|
||||
for m in machine:
|
||||
system = get_inxi(m, "System")
|
||||
if system:
|
||||
manufacturer = system
|
||||
model = get_inxi(m, "product")
|
||||
serial_number = get_inxi(m, "serial")
|
||||
chassis = get_inxi(m, "Type")
|
||||
else:
|
||||
sku = get_inxi(m, "part-nu")
|
||||
|
||||
if not snapshot["data"].get('lshw'):
|
||||
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
|
||||
|
||||
lshw = snapshot["data"]["lshw"]
|
||||
# mac = get_mac2(hwinfo_raw) or ""
|
||||
mac = get_mac(lshw) or ""
|
||||
mac = get_mac(self.inxi) or ""
|
||||
if not mac:
|
||||
txt = "Could not retrieve MAC address in snapshot %s"
|
||||
logger.warning(txt, snapshot['uuid'])
|
||||
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
|
||||
|
||||
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}"
|
||||
|
||||
def get_signature(self, doc):
|
||||
return hashlib.sha3_256(json.dumps(doc).encode()).hexdigest()
|
||||
|
||||
def register_device_dlt(self):
|
||||
chid = self.algorithms.get('legacy_dpp')
|
||||
phid = self.get_signature(self.get_phid())
|
||||
register_device_dlt(chid, phid, self.uuid, self.user)
|
||||
register_passport_dlt(chid, phid, self.uuid, self.user)
|
||||
|
|
|
@ -1,505 +1,38 @@
|
|||
import json
|
||||
import logging
|
||||
import numpy as np
|
||||
|
||||
from datetime import datetime
|
||||
from dmidecode import DMIParse
|
||||
from json_repair import repair_json
|
||||
|
||||
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
|
||||
from evidence import (
|
||||
legacy_parse_details,
|
||||
normal_parse_details,
|
||||
old_parse_details
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger('django')
|
||||
|
||||
|
||||
def get_lshw_child(child, nets, component):
|
||||
if child.get('id') == component:
|
||||
nets.append(child)
|
||||
if child.get('children'):
|
||||
[get_lshw_child(x, nets, component) for x in child['children']]
|
||||
|
||||
|
||||
class ParseSnapshot:
|
||||
def __init__(self, snapshot, default="n/a"):
|
||||
self.default = default
|
||||
self.dmidecode_raw = snapshot["data"].get("dmidecode", "{}")
|
||||
self.smart_raw = snapshot["data"].get("disks", [])
|
||||
self.hwinfo_raw = snapshot["data"].get("hwinfo", "")
|
||||
self.lshw_raw = snapshot["data"].get("lshw", {}) or {}
|
||||
self.lscpi_raw = snapshot["data"].get("lspci", "")
|
||||
self.device = {"actions": []}
|
||||
self.components = []
|
||||
self.monitors = []
|
||||
|
||||
self.dmi = DMIParse(self.dmidecode_raw)
|
||||
self.smart = self.loads(self.smart_raw)
|
||||
self.lshw = self.loads(self.lshw_raw)
|
||||
self.hwinfo = self.parse_hwinfo()
|
||||
|
||||
self.set_computer()
|
||||
self.get_hwinfo_monitors()
|
||||
self.set_components()
|
||||
self.snapshot_json = {
|
||||
"type": "Snapshot",
|
||||
"device": self.device,
|
||||
"software": snapshot["software"],
|
||||
"components": self.components,
|
||||
"uuid": snapshot['uuid'],
|
||||
"version": snapshot['version'],
|
||||
"endTime": snapshot["timestamp"],
|
||||
"elapsed": 1,
|
||||
}
|
||||
|
||||
def set_computer(self):
|
||||
self.device['manufacturer'] = self.dmi.manufacturer().strip()
|
||||
self.device['model'] = self.dmi.model().strip()
|
||||
self.device['serialNumber'] = self.dmi.serial_number()
|
||||
self.device['type'] = self.get_type()
|
||||
self.device['sku'] = self.get_sku()
|
||||
self.device['version'] = self.get_version()
|
||||
self.device['system_uuid'] = self.get_uuid()
|
||||
self.device['family'] = self.get_family()
|
||||
self.device['chassis'] = self.get_chassis_dh()
|
||||
|
||||
def set_components(self):
|
||||
self.get_cpu()
|
||||
self.get_ram()
|
||||
self.get_mother_board()
|
||||
self.get_graphic()
|
||||
self.get_data_storage()
|
||||
self.get_display()
|
||||
self.get_sound_card()
|
||||
self.get_networks()
|
||||
|
||||
def get_cpu(self):
|
||||
for cpu in self.dmi.get('Processor'):
|
||||
serial = cpu.get('Serial Number')
|
||||
if serial == 'Not Specified' or not serial:
|
||||
serial = cpu.get('ID').replace(' ', '')
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "Processor",
|
||||
"speed": self.get_cpu_speed(cpu),
|
||||
"cores": int(cpu.get('Core Count', 1)),
|
||||
"model": cpu.get('Version'),
|
||||
"threads": int(cpu.get('Thread Count', 1)),
|
||||
"manufacturer": cpu.get('Manufacturer'),
|
||||
"serialNumber": serial,
|
||||
"brand": cpu.get('Family'),
|
||||
"address": self.get_cpu_address(cpu),
|
||||
"bogomips": self.get_bogomips(),
|
||||
}
|
||||
)
|
||||
|
||||
def get_ram(self):
|
||||
for ram in self.dmi.get("Memory Device"):
|
||||
if ram.get('size') == 'No Module Installed':
|
||||
continue
|
||||
if not ram.get("Speed"):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "RamModule",
|
||||
"size": self.get_ram_size(ram),
|
||||
"speed": self.get_ram_speed(ram),
|
||||
"manufacturer": ram.get("Manufacturer", self.default),
|
||||
"serialNumber": ram.get("Serial Number", self.default),
|
||||
"interface": ram.get("Type", "DDR"),
|
||||
"format": ram.get("Form Factor", "DIMM"),
|
||||
"model": ram.get("Part Number", self.default),
|
||||
}
|
||||
)
|
||||
|
||||
def get_mother_board(self):
|
||||
for moder_board in self.dmi.get("Baseboard"):
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "Motherboard",
|
||||
"version": moder_board.get("Version"),
|
||||
"serialNumber": moder_board.get("Serial Number", "").strip(),
|
||||
"manufacturer": moder_board.get("Manufacturer", "").strip(),
|
||||
"biosDate": self.get_bios_date(),
|
||||
"ramMaxSize": self.get_max_ram_size(),
|
||||
"ramSlots": len(self.dmi.get("Memory Device")),
|
||||
"slots": self.get_ram_slots(),
|
||||
"model": moder_board.get("Product Name", "").strip(),
|
||||
"firewire": self.get_firmware_num(),
|
||||
"pcmcia": self.get_pcmcia_num(),
|
||||
"serial": self.get_serial_num(),
|
||||
"usb": self.get_usb_num(),
|
||||
}
|
||||
)
|
||||
|
||||
def get_graphic(self):
|
||||
displays = []
|
||||
get_lshw_child(self.lshw, displays, 'display')
|
||||
|
||||
for c in displays:
|
||||
if not c['configuration'].get('driver', None):
|
||||
continue
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "GraphicCard",
|
||||
"memory": self.get_memory_video(c),
|
||||
"manufacturer": c.get("vendor", self.default),
|
||||
"model": c.get("product", self.default),
|
||||
"serialNumber": c.get("serial", self.default),
|
||||
}
|
||||
)
|
||||
|
||||
def get_memory_video(self, c):
|
||||
# get info of lspci
|
||||
# pci_id = c['businfo'].split('@')[1]
|
||||
# lspci.get(pci_id) | grep size
|
||||
# lspci -v -s 00:02.0
|
||||
return None
|
||||
|
||||
def get_data_storage(self):
|
||||
for sm in self.smart:
|
||||
if sm.get('smartctl', {}).get('exit_status') == 1:
|
||||
continue
|
||||
model = sm.get('model_name')
|
||||
manufacturer = None
|
||||
hours = sm.get("power_on_time", {}).get("hours", 0)
|
||||
if model and len(model.split(" ")) > 1:
|
||||
mm = model.split(" ")
|
||||
model = mm[-1]
|
||||
manufacturer = " ".join(mm[:-1])
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": self.sanitize(sm),
|
||||
"type": self.get_data_storage_type(sm),
|
||||
"model": model,
|
||||
"manufacturer": manufacturer,
|
||||
"serialNumber": sm.get('serial_number'),
|
||||
"size": self.get_data_storage_size(sm),
|
||||
"variant": sm.get("firmware_version"),
|
||||
"interface": self.get_data_storage_interface(sm),
|
||||
"hours": hours,
|
||||
}
|
||||
)
|
||||
|
||||
def sanitize(self, action):
|
||||
return []
|
||||
|
||||
def get_bogomips(self):
|
||||
if not self.hwinfo:
|
||||
return self.default
|
||||
|
||||
bogomips = 0
|
||||
for row in self.hwinfo:
|
||||
for cel in row:
|
||||
if 'BogoMips' in cel:
|
||||
try:
|
||||
bogomips += float(cel.split(":")[-1])
|
||||
except:
|
||||
pass
|
||||
return bogomips
|
||||
|
||||
def get_networks(self):
|
||||
networks = []
|
||||
get_lshw_child(self.lshw, networks, 'network')
|
||||
|
||||
for c in networks:
|
||||
capacity = c.get('capacity')
|
||||
wireless = bool(c.get('configuration', {}).get('wireless', False))
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "NetworkAdapter",
|
||||
"model": c.get('product'),
|
||||
"manufacturer": c.get('vendor'),
|
||||
"serialNumber": c.get('serial'),
|
||||
"speed": capacity,
|
||||
"variant": c.get('version', 1),
|
||||
"wireless": wireless or False,
|
||||
"integrated": "PCI:0000:00" in c.get("businfo", ""),
|
||||
}
|
||||
)
|
||||
|
||||
def get_sound_card(self):
|
||||
multimedias = []
|
||||
get_lshw_child(self.lshw, multimedias, 'multimedia')
|
||||
|
||||
for c in multimedias:
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "SoundCard",
|
||||
"model": c.get('product'),
|
||||
"manufacturer": c.get('vendor'),
|
||||
"serialNumber": c.get('serial'),
|
||||
}
|
||||
)
|
||||
|
||||
def get_display(self): # noqa: C901
|
||||
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED'
|
||||
|
||||
for c in self.monitors:
|
||||
resolution_width, resolution_height = (None,) * 2
|
||||
refresh, serial, model, manufacturer, size = (None,) * 5
|
||||
year, week, production_date = (None,) * 3
|
||||
|
||||
for x in c:
|
||||
if "Vendor: " in x:
|
||||
manufacturer = x.split('Vendor: ')[-1].strip()
|
||||
if "Model: " in x:
|
||||
model = x.split('Model: ')[-1].strip()
|
||||
if "Serial ID: " in x:
|
||||
serial = x.split('Serial ID: ')[-1].strip()
|
||||
if " Resolution: " in x:
|
||||
rs = x.split(' Resolution: ')[-1].strip()
|
||||
if 'x' in rs:
|
||||
resolution_width, resolution_height = [
|
||||
int(r) for r in rs.split('x')
|
||||
]
|
||||
if "Frequencies: " in x:
|
||||
try:
|
||||
refresh = int(float(x.split(',')[-1].strip()[:-3]))
|
||||
except Exception:
|
||||
pass
|
||||
if 'Year of Manufacture' in x:
|
||||
year = x.split(': ')[1]
|
||||
|
||||
if 'Week of Manufacture' in x:
|
||||
week = x.split(': ')[1]
|
||||
|
||||
if "Size: " in x:
|
||||
size = self.get_size_monitor(x)
|
||||
technology = next((t for t in TECHS if t in c[0]), None)
|
||||
|
||||
if year and week:
|
||||
d = '{} {} 0'.format(year, week)
|
||||
production_date = datetime.strptime(d, '%Y %W %w').isoformat()
|
||||
|
||||
self.components.append(
|
||||
{
|
||||
"actions": [],
|
||||
"type": "Display",
|
||||
"model": model,
|
||||
"manufacturer": manufacturer,
|
||||
"serialNumber": serial,
|
||||
'size': size,
|
||||
'resolutionWidth': resolution_width,
|
||||
'resolutionHeight': resolution_height,
|
||||
"productionDate": production_date,
|
||||
'technology': technology,
|
||||
'refreshRate': refresh,
|
||||
}
|
||||
)
|
||||
|
||||
def get_hwinfo_monitors(self):
|
||||
for c in self.hwinfo:
|
||||
monitor = None
|
||||
external = None
|
||||
for x in c:
|
||||
if 'Hardware Class: monitor' in x:
|
||||
monitor = c
|
||||
if 'Driver Info' in x:
|
||||
external = c
|
||||
|
||||
if monitor and not external:
|
||||
self.monitors.append(c)
|
||||
|
||||
def get_size_monitor(self, x):
|
||||
i = 1 / 25.4
|
||||
t = x.split('Size: ')[-1].strip()
|
||||
tt = t.split('mm')
|
||||
if not tt:
|
||||
return 0
|
||||
sizes = tt[0].strip()
|
||||
if 'x' not in sizes:
|
||||
return 0
|
||||
w, h = [int(x) for x in sizes.split('x')]
|
||||
return "{:.2f}".format(np.sqrt(w**2 + h**2) * i)
|
||||
|
||||
def get_cpu_address(self, cpu):
|
||||
default = 64
|
||||
for ch in self.lshw.get('children', []):
|
||||
for c in ch.get('children', []):
|
||||
if c['class'] == 'processor':
|
||||
return c.get('width', default)
|
||||
return default
|
||||
|
||||
def get_usb_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "USB" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_serial_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "SERIAL" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_firmware_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "FIRMWARE" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_pcmcia_num(self):
|
||||
return len(
|
||||
[
|
||||
u
|
||||
for u in self.dmi.get("Port Connector")
|
||||
if "PCMCIA" in u.get("Port Type", "").upper()
|
||||
]
|
||||
)
|
||||
|
||||
def get_bios_date(self):
|
||||
return self.dmi.get("BIOS")[0].get("Release Date", self.default)
|
||||
|
||||
def get_firmware(self):
|
||||
return self.dmi.get("BIOS")[0].get("Firmware Revision", '1')
|
||||
|
||||
def get_max_ram_size(self):
|
||||
size = 0
|
||||
for slot in self.dmi.get("Physical Memory Array"):
|
||||
capacity = slot.get("Maximum Capacity", '0').split(" ")[0]
|
||||
size += int(capacity)
|
||||
|
||||
return size
|
||||
|
||||
def get_ram_slots(self):
|
||||
slots = 0
|
||||
for x in self.dmi.get("Physical Memory Array"):
|
||||
slots += int(x.get("Number Of Devices", 0))
|
||||
return slots
|
||||
|
||||
def get_ram_size(self, ram):
|
||||
memory = ram.get("Size", "0")
|
||||
return memory
|
||||
|
||||
def get_ram_speed(self, ram):
|
||||
size = ram.get("Speed", "0")
|
||||
return size
|
||||
|
||||
def get_cpu_speed(self, cpu):
|
||||
speed = cpu.get('Max Speed', "0")
|
||||
return speed
|
||||
|
||||
def get_sku(self):
|
||||
return self.dmi.get("System")[0].get("SKU Number", self.default).strip()
|
||||
|
||||
def get_version(self):
|
||||
return self.dmi.get("System")[0].get("Version", self.default).strip()
|
||||
|
||||
def get_uuid(self):
|
||||
return self.dmi.get("System")[0].get("UUID", '').strip()
|
||||
|
||||
def get_family(self):
|
||||
return self.dmi.get("System")[0].get("Family", '')
|
||||
|
||||
def get_chassis(self):
|
||||
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
|
||||
|
||||
def get_type(self):
|
||||
chassis_type = self.get_chassis()
|
||||
return self.translation_to_devicehub(chassis_type)
|
||||
|
||||
def translation_to_devicehub(self, original_type):
|
||||
lower_type = original_type.lower()
|
||||
CHASSIS_TYPE = {
|
||||
'Desktop': [
|
||||
'desktop',
|
||||
'low-profile',
|
||||
'tower',
|
||||
'docking',
|
||||
'all-in-one',
|
||||
'pizzabox',
|
||||
'mini-tower',
|
||||
'space-saving',
|
||||
'lunchbox',
|
||||
'mini',
|
||||
'stick',
|
||||
],
|
||||
'Laptop': [
|
||||
'portable',
|
||||
'laptop',
|
||||
'convertible',
|
||||
'tablet',
|
||||
'detachable',
|
||||
'notebook',
|
||||
'handheld',
|
||||
'sub-notebook',
|
||||
],
|
||||
'Server': ['server'],
|
||||
'Computer': ['_virtual'],
|
||||
}
|
||||
for k, v in CHASSIS_TYPE.items():
|
||||
if lower_type in v:
|
||||
return k
|
||||
return self.default
|
||||
|
||||
def get_chassis_dh(self):
|
||||
chassis = self.get_chassis()
|
||||
lower_type = chassis.lower()
|
||||
for k, v in CHASSIS_DH.items():
|
||||
if lower_type in v:
|
||||
return k
|
||||
return self.default
|
||||
|
||||
def get_data_storage_type(self, x):
|
||||
# TODO @cayop add more SSDS types
|
||||
SSDS = ["nvme"]
|
||||
SSD = 'SolidStateDrive'
|
||||
HDD = 'HardDrive'
|
||||
type_dev = x.get('device', {}).get('type')
|
||||
trim = x.get('trim', {}).get("supported") in [True, "true"]
|
||||
return SSD if type_dev in SSDS or trim else HDD
|
||||
|
||||
def get_data_storage_interface(self, x):
|
||||
interface = x.get('device', {}).get('protocol', 'ATA')
|
||||
if interface.upper() in DATASTORAGEINTERFACE:
|
||||
return interface.upper()
|
||||
|
||||
txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
|
||||
self.sid, interface
|
||||
)
|
||||
self.errors("{}".format(err))
|
||||
|
||||
def get_data_storage_size(self, x):
|
||||
return x.get('user_capacity', {}).get('bytes')
|
||||
|
||||
def parse_hwinfo(self):
|
||||
hw_blocks = self.hwinfo_raw.split("\n\n")
|
||||
return [x.split("\n") for x in hw_blocks]
|
||||
|
||||
def loads(self, x):
|
||||
if isinstance(x, str):
|
||||
try:
|
||||
try:
|
||||
hw = json.loads(x)
|
||||
except json.decoder.JSONDecodeError:
|
||||
hw = json.loads(repair_json(x))
|
||||
return hw
|
||||
except Exception as ss:
|
||||
logger.warning("%s", ss)
|
||||
return {}
|
||||
return x
|
||||
|
||||
def errors(self, txt=None):
|
||||
if not txt:
|
||||
return self._errors
|
||||
|
||||
logger.error(txt)
|
||||
self._errors.append("%s", txt)
|
||||
if snapshot.get("credentialSubject"):
|
||||
self.build = normal_parse_details.ParseSnapshot(
|
||||
snapshot,
|
||||
default=default
|
||||
)
|
||||
elif snapshot.get("software") != "workbench-script":
|
||||
self.build = old_parse_details.ParseSnapshot(
|
||||
snapshot,
|
||||
default=default
|
||||
)
|
||||
elif snapshot.get("data",{}).get("lshw"):
|
||||
self.build = legacy_parse_details.ParseSnapshot(
|
||||
snapshot,
|
||||
default=default
|
||||
)
|
||||
else:
|
||||
self.build = normal_parse_details.ParseSnapshot(
|
||||
snapshot,
|
||||
default=default
|
||||
)
|
||||
|
||||
self.default = default
|
||||
self.device = self.build.snapshot_json.get("device")
|
||||
self.components = self.build.snapshot_json.get("components")
|
||||
|
|
|
@ -137,7 +137,7 @@ class DownloadEvidenceView(DashboardView, TemplateView):
|
|||
evidence.get_doc()
|
||||
data = json.dumps(evidence.doc)
|
||||
response = HttpResponse(data, content_type="application/json")
|
||||
response['Content-Disposition'] = 'attachment; filename={}'.format("credential.json")
|
||||
response['Content-Disposition'] = 'attachment; filename={}'.format("evidence.json")
|
||||
return response
|
||||
|
||||
|
||||
|
|
|
@ -22,10 +22,14 @@ def search(institution, qs, offset=0, limit=10):
|
|||
qp.set_stemming_strategy(xapian.QueryParser.STEM_SOME)
|
||||
qp.add_prefix("uuid", "uuid")
|
||||
query = qp.parse_query(qs)
|
||||
institution_term = "U{}".format(institution.id)
|
||||
final_query = xapian.Query(
|
||||
xapian.Query.OP_AND, query, xapian.Query(institution_term)
|
||||
)
|
||||
if institution:
|
||||
institution_term = "U{}".format(institution.id)
|
||||
final_query = xapian.Query(
|
||||
xapian.Query.OP_AND, query, xapian.Query(institution_term)
|
||||
)
|
||||
else:
|
||||
final_query = xapian.Query(query)
|
||||
|
||||
enquire = xapian.Enquire(database)
|
||||
enquire.set_query(final_query)
|
||||
matches = enquire.get_mset(offset, limit)
|
||||
|
|
1
example/dpp-snapshots/hp_probook_450.json
Normal file
1
example/dpp-snapshots/hp_probook_450.json
Normal file
File diff suppressed because one or more lines are too long
1
example/dpp-snapshots/hp_probook_450_2.json
Normal file
1
example/dpp-snapshots/hp_probook_450_2.json
Normal file
File diff suppressed because one or more lines are too long
1
example/dpp-snapshots/hp_probook_g2.json
Normal file
1
example/dpp-snapshots/hp_probook_g2.json
Normal file
File diff suppressed because one or more lines are too long
1
example/dpp-snapshots/hp_probook_g8.json
Normal file
1
example/dpp-snapshots/hp_probook_g8.json
Normal file
File diff suppressed because one or more lines are too long
1
example/dpp-snapshots/snapshot01.json
Normal file
1
example/dpp-snapshots/snapshot01.json
Normal file
|
@ -0,0 +1 @@
|
|||
{"closed": true, "components": [{"actions": [], "manufacturer": "Intel Corporation", "model": "82579LM Gigabit Network Connection", "serialNumber": "00:11:11:11:11:00", "speed": 1000.0, "type": "NetworkAdapter", "variant": "04", "wireless": false}, {"actions": [], "manufacturer": "Intel Corporation", "model": "7 Series/C216 Chipset Family High Definition Audio Controller", "serialNumber": null, "type": "SoundCard"}, {"actions": [], "format": "DIMM", "interface": "DDR3", "manufacturer": "Micron", "model": "16KTF51264AZ", "serialNumber": "AAAAAAAA", "size": 4096.0, "speed": 1600.0, "type": "RamModule"}, {"actions": [{"endTime": "2022-10-11T13:45:31.239555+00:00", "severity": "Info", "startTime": "2021-10-11T09:45:19.623967+00:00", "steps": [{"endTime": "2021-10-11T11:05:28.090897+00:00", "severity": "Info", "startTime": "2021-10-11T09:45:19.624163+00:00", "type": "StepZero"}, {"endTime": "2021-10-11T13:45:31.239402+00:00", "severity": "Info", "startTime": "2021-10-11T11:05:28.091255+00:00", "type": "StepRandom"}], "type": "EraseSectors"}, {"assessment": true, "commandTimeout": 30, "currentPendingSectorCount": 0, "elapsed": 60, "length": "Short", "lifetime": 18720, "offlineUncorrectable": 0, "powerCycleCount": 2147, "reallocatedSectorCount": 0, "reportedUncorrectableErrors": 0, "severity": "Info", "status": "Completed without error", "type": "TestDataStorage"}, {"elapsed": 11, "readSpeed": 119.0, "type": "BenchmarkDataStorage", "writeSpeed": 32.7}], "interface": "ATA", "manufacturer": "Seagate", "model": "ST3500418AS", "serialNumber": "AAAAAAAA", "size": 500000.0, "type": "HardDrive", "variant": "CC46"}, {"actions": [{"elapsed": 0, "rate": 25540.36, "type": "BenchmarkProcessor"}, {"elapsed": 8, "rate": 7.6939, "type": "BenchmarkProcessorSysbench"}], "address": 64, "brand": "Core i5", "cores": 4, "generation": 3, "manufacturer": "Intel Corp.", "model": "Intel Core i5-3470 CPU @ 3.20GHz", "serialNumber": null, "speed": 1.6242180000000002, "threads": 4, "type": "Processor"}, {"actions": [], "manufacturer": "Intel Corporation", "memory": null, "model": "Xeon E3-1200 v2/3rd Gen Core processor Graphics Controller", "serialNumber": null, "type": "GraphicCard"}, {"actions": [], "biosDate": "2012-08-07T00:00:00", "firewire": 0, "manufacturer": "LENOVO", "model": "MAHOBAY", "pcmcia": 0, "ramMaxSize": 32, "ramSlots": 4, "serial": 1, "serialNumber": null, "slots": 4, "type": "Motherboard", "usb": 3, "version": "9SKT39AUS"}], "device": {"actions": [{"elapsed": 1, "rate": 0.6507, "type": "BenchmarkRamSysbench"}], "chassis": "Tower", "manufacturer": "LENOVO", "model": "3227A2G", "serialNumber": "AAAAAAAA", "sku": "LENOVO_MT_3227", "type": "Desktop", "version": "ThinkCentre M92P"}, "elapsed": 187302510, "endTime": "2016-11-03T17:17:01.116554+00:00", "software": "Workbench", "type": "Snapshot", "uuid": "ae913de1-e639-476a-ad9b-78eabbe4628b", "version": "11.0b11"}
|
1
example/dpp-snapshots/snapshot02.json
Normal file
1
example/dpp-snapshots/snapshot02.json
Normal file
|
@ -0,0 +1 @@
|
|||
{"closed": true, "components": [{"actions": [], "manufacturer": "Intel Corporation", "model": "82579LM Gigabit Network Connection", "serialNumber": "00:11:11:11:11:00", "speed": 1000.0, "type": "NetworkAdapter", "variant": "04", "wireless": false}, {"actions": [], "manufacturer": "Intel Corporation", "model": "7 Series/C216 Chipset Family High Definition Audio Controller", "serialNumber": null, "type": "SoundCard"}, {"actions": [], "format": "DIMM", "interface": "DDR3", "manufacturer": "Micron", "model": "16KTF51264AZ", "serialNumber": "AAAAAAAA", "size": 4096.0, "speed": 1600.0, "type": "RamModule"}, {"actions": [{"endTime": "2022-10-11T13:45:31.239555+00:00", "severity": "Info", "startTime": "2021-10-11T09:45:19.623967+00:00", "steps": [{"endTime": "2021-10-11T11:05:28.090897+00:00", "severity": "Info", "startTime": "2021-10-11T09:45:19.624163+00:00", "type": "StepZero"}, {"endTime": "2021-10-11T13:45:31.239402+00:00", "severity": "Info", "startTime": "2021-10-11T11:05:28.091255+00:00", "type": "StepRandom"}], "type": "EraseSectors"}, {"assessment": true, "commandTimeout": 30, "currentPendingSectorCount": 0, "elapsed": 60, "length": "Short", "lifetime": 18720, "offlineUncorrectable": 0, "powerCycleCount": 2147, "reallocatedSectorCount": 0, "reportedUncorrectableErrors": 0, "severity": "Info", "status": "Completed without error", "type": "TestDataStorage"}, {"elapsed": 11, "readSpeed": 119.0, "type": "BenchmarkDataStorage", "writeSpeed": 32.7}], "interface": "ATA", "manufacturer": "Seagate", "model": "ST3500418AS", "serialNumber": "AAAAAAAA", "size": 500000.0, "type": "HardDrive", "variant": "CC46"}, {"actions": [{"elapsed": 0, "rate": 25540.36, "type": "BenchmarkProcessor"}, {"elapsed": 8, "rate": 7.6939, "type": "BenchmarkProcessorSysbench"}], "address": 64, "brand": "Core i5", "cores": 4, "generation": 3, "manufacturer": "Intel Corp.", "model": "Intel Core i5-3470 CPU @ 3.20GHz", "serialNumber": null, "speed": 1.6242180000000002, "threads": 4, "type": "Processor"}, {"actions": [], "manufacturer": "Intel Corporation", "memory": null, "model": "Xeon E3-1200 v2/3rd Gen Core processor Graphics Controller", "serialNumber": null, "type": "GraphicCard"}, {"actions": [], "biosDate": "2012-08-07T00:00:00", "firewire": 0, "manufacturer": "LENOVO", "model": "MAHOBAY", "pcmcia": 0, "ramMaxSize": 32, "ramSlots": 4, "serial": 1, "serialNumber": null, "slots": 4, "type": "Motherboard", "usb": 3, "version": "9SKT39AUS"}], "device": {"actions": [{"elapsed": 1, "rate": 0.6507, "type": "BenchmarkRamSysbench"}], "chassis": "Tower", "manufacturer": "LENOVO", "model": "3227A2G", "serialNumber": "AAAAAAAAD", "sku": "LENOVO_MT_3227", "type": "Desktop", "version": "ThinkCentre M92P"}, "elapsed": 187302510, "endTime": "2016-11-03T17:17:01.116554+00:00", "software": "Workbench", "type": "Snapshot", "uuid": "ae913de1-e639-476a-ad9b-78eabbe4625b", "version": "11.0b11"}
|
1
example/snapshots/snapshot_workbench-script_legacy.json
Normal file
1
example/snapshots/snapshot_workbench-script_legacy.json
Normal file
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
1
example/snapshots/snapshot_workbench-script_signed.json
Normal file
1
example/snapshots/snapshot_workbench-script_signed.json
Normal file
File diff suppressed because one or more lines are too long
|
@ -11,3 +11,7 @@ xlrd==2.0.1
|
|||
odfpy==1.4.1
|
||||
pytz==2024.2
|
||||
json-repair==0.30.0
|
||||
setuptools==65.5.1
|
||||
requests==2.32.3
|
||||
wheel==0.45.1
|
||||
|
||||
|
|
|
@ -13,12 +13,23 @@ HID_ALGO1 = [
|
|||
"manufacturer",
|
||||
"model",
|
||||
"chassis",
|
||||
"serialNumber",
|
||||
"serial_number",
|
||||
"sku"
|
||||
]
|
||||
|
||||
LEGACY_DPP = [
|
||||
"manufacturer",
|
||||
"model",
|
||||
"chassis",
|
||||
"serial_number",
|
||||
"sku",
|
||||
"type",
|
||||
"version"
|
||||
]
|
||||
|
||||
ALGOS = {
|
||||
"hidalgo1": HID_ALGO1,
|
||||
"legacy_dpp": LEGACY_DPP
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -18,20 +18,28 @@ class CustomFormatter(logging.Formatter):
|
|||
color = PURPLE
|
||||
else:
|
||||
color = RESET
|
||||
|
||||
|
||||
record.levelname = f"{color}{record.levelname}{RESET}"
|
||||
|
||||
if record.args:
|
||||
record.msg = self.highlight_args(record.msg, record.args, color)
|
||||
record.args = ()
|
||||
|
||||
# provide trace when DEBUG config
|
||||
if settings.DEBUG:
|
||||
import traceback
|
||||
print(traceback.format_exc())
|
||||
try:
|
||||
record.msg = record.msg % record.args
|
||||
record.args = ()
|
||||
except (TypeError, ValueError):
|
||||
record.msg = f"{color}{record.msg}{RESET}"
|
||||
|
||||
# Highlight the final formatted message
|
||||
record.msg = self.highlight_message(record.msg, color)
|
||||
|
||||
# pedro says: I discovered that trace is provided anyway with
|
||||
# this commented (reason: strange None msgs)
|
||||
# is this needed?
|
||||
### provide trace when DEBUG config
|
||||
#if settings.DEBUG:
|
||||
# import traceback
|
||||
# print(traceback.format_exc())
|
||||
|
||||
return super().format(record)
|
||||
|
||||
def highlight_args(self, message, args, color):
|
||||
highlighted_args = tuple(f"{color}{arg}{RESET}" for arg in args)
|
||||
return message % highlighted_args
|
||||
def highlight_message(self, message, color):
|
||||
return f"{color}{message}{RESET}"
|
||||
|
|
|
@ -19,7 +19,10 @@ def move_json(path_name, user, place="snapshots"):
|
|||
|
||||
|
||||
def save_in_disk(data, user, place="snapshots"):
|
||||
uuid = data.get('uuid', '')
|
||||
uuid = data.get("uuid")
|
||||
if data.get("credentialSubject"):
|
||||
uuid = data["credentialSubject"].get("uuid")
|
||||
|
||||
now = datetime.now()
|
||||
year = now.year
|
||||
month = now.month
|
||||
|
|
Loading…
Reference in a new issue