2024-07-18 15:21:22 +00:00
|
|
|
import json
|
2024-11-18 18:37:08 +00:00
|
|
|
import hashlib
|
2024-07-18 15:21:22 +00:00
|
|
|
|
2024-09-18 16:01:46 +00:00
|
|
|
from dmidecode import DMIParse
|
2024-06-12 07:32:49 +00:00
|
|
|
from django.db import models
|
2024-07-18 15:21:22 +00:00
|
|
|
|
2024-10-23 11:39:16 +00:00
|
|
|
from utils.constants import STR_EXTEND_SIZE, CHASSIS_DH
|
2024-07-26 15:59:34 +00:00
|
|
|
from evidence.xapian import search
|
2024-11-15 11:47:08 +00:00
|
|
|
from evidence.parse_details import ParseSnapshot, get_inxi, get_inxi_key
|
2024-12-20 16:04:09 +00:00
|
|
|
from evidence.legacy_parse_details import ParseSnapshot as legacy_ParseSnapshot
|
2024-10-04 15:32:53 +00:00
|
|
|
from user.models import User, Institution
|
2024-06-12 07:32:49 +00:00
|
|
|
|
|
|
|
|
2024-07-31 11:28:46 +00:00
|
|
|
class Annotation(models.Model):
|
|
|
|
class Type(models.IntegerChoices):
|
2024-11-04 07:21:01 +00:00
|
|
|
SYSTEM = 0, "System"
|
2024-07-31 11:28:46 +00:00
|
|
|
USER = 1, "User"
|
|
|
|
DOCUMENT = 2, "Document"
|
2024-10-28 16:52:34 +00:00
|
|
|
ERASE_SERVER = 3, "EraseServer"
|
2024-07-31 11:28:46 +00:00
|
|
|
|
|
|
|
created = models.DateTimeField(auto_now_add=True)
|
|
|
|
uuid = models.UUIDField()
|
2024-09-18 16:01:46 +00:00
|
|
|
owner = models.ForeignKey(Institution, on_delete=models.CASCADE)
|
2024-11-04 07:21:01 +00:00
|
|
|
user = models.ForeignKey(
|
|
|
|
User, on_delete=models.SET_NULL, null=True, blank=True)
|
|
|
|
type = models.SmallIntegerField(choices=Type)
|
2024-07-31 11:28:46 +00:00
|
|
|
key = models.CharField(max_length=STR_EXTEND_SIZE)
|
|
|
|
value = models.CharField(max_length=STR_EXTEND_SIZE)
|
|
|
|
|
|
|
|
class Meta:
|
|
|
|
constraints = [
|
2024-11-04 07:21:01 +00:00
|
|
|
models.UniqueConstraint(
|
|
|
|
fields=["type", "key", "uuid"], name="unique_type_key_uuid")
|
2024-07-31 11:28:46 +00:00
|
|
|
]
|
|
|
|
|
|
|
|
|
2024-07-26 15:59:34 +00:00
|
|
|
class Evidence:
|
2024-07-18 15:21:22 +00:00
|
|
|
def __init__(self, uuid):
|
|
|
|
self.uuid = uuid
|
|
|
|
self.owner = None
|
|
|
|
self.doc = None
|
|
|
|
self.created = None
|
2024-09-18 16:01:46 +00:00
|
|
|
self.dmi = None
|
2024-11-15 11:47:08 +00:00
|
|
|
self.inxi = None
|
2024-11-04 07:21:01 +00:00
|
|
|
self.annotations = []
|
|
|
|
self.components = []
|
2024-09-25 10:51:08 +00:00
|
|
|
self.default = "n/a"
|
2024-06-12 07:32:49 +00:00
|
|
|
|
2024-07-18 15:21:22 +00:00
|
|
|
self.get_owner()
|
|
|
|
self.get_time()
|
2024-06-12 07:32:49 +00:00
|
|
|
|
2024-07-18 15:21:22 +00:00
|
|
|
def get_annotations(self):
|
|
|
|
self.annotations = Annotation.objects.filter(
|
|
|
|
uuid=self.uuid
|
|
|
|
).order_by("created")
|
2024-07-31 11:28:46 +00:00
|
|
|
|
2024-07-18 15:21:22 +00:00
|
|
|
def get_owner(self):
|
|
|
|
if not self.annotations:
|
|
|
|
self.get_annotations()
|
|
|
|
a = self.annotations.first()
|
|
|
|
if a:
|
|
|
|
self.owner = a.owner
|
2024-06-12 07:32:49 +00:00
|
|
|
|
2024-11-18 18:37:08 +00:00
|
|
|
def get_phid(self):
|
|
|
|
if not self.doc:
|
|
|
|
self.get_doc()
|
2024-12-16 17:12:02 +00:00
|
|
|
|
2024-11-18 18:37:08 +00:00
|
|
|
return hashlib.sha3_256(json.dumps(self.doc)).hexdigest()
|
|
|
|
|
2024-07-18 15:21:22 +00:00
|
|
|
def get_doc(self):
|
|
|
|
self.doc = {}
|
2024-12-16 17:12:02 +00:00
|
|
|
self.inxi = None
|
|
|
|
|
2024-10-04 15:32:53 +00:00
|
|
|
if not self.owner:
|
|
|
|
self.get_owner()
|
2024-12-16 17:12:02 +00:00
|
|
|
|
2024-07-18 15:21:22 +00:00
|
|
|
qry = 'uuid:"{}"'.format(self.uuid)
|
2024-10-04 15:32:53 +00:00
|
|
|
matches = search(self.owner, qry, limit=1)
|
2024-10-25 15:36:13 +00:00
|
|
|
if matches and matches.size() < 0:
|
2024-07-18 15:21:22 +00:00
|
|
|
return
|
|
|
|
|
|
|
|
for xa in matches:
|
|
|
|
self.doc = json.loads(xa.document.get_data())
|
2024-09-25 10:51:08 +00:00
|
|
|
|
2024-12-05 18:23:53 +00:00
|
|
|
if self.is_legacy():
|
|
|
|
return
|
|
|
|
|
|
|
|
if self.doc.get("credentialSubject"):
|
|
|
|
for ev in self.doc["evidence"]:
|
|
|
|
if "dmidecode" == ev.get("operation"):
|
|
|
|
dmidecode_raw = ev["output"]
|
|
|
|
if "inxi" == ev.get("operation"):
|
|
|
|
self.inxi = ev["output"]
|
|
|
|
else:
|
2024-09-18 16:01:46 +00:00
|
|
|
dmidecode_raw = self.doc["data"]["dmidecode"]
|
2024-12-20 16:04:09 +00:00
|
|
|
inxi_raw = self.doc.get("data", {}).get("inxi")
|
2024-12-16 17:12:02 +00:00
|
|
|
self.dmi = DMIParse(dmidecode_raw)
|
|
|
|
try:
|
|
|
|
self.inxi = json.loads(inxi_raw)
|
|
|
|
except Exception:
|
|
|
|
pass
|
|
|
|
if self.inxi:
|
2024-11-15 11:47:08 +00:00
|
|
|
try:
|
2024-12-16 17:12:02 +00:00
|
|
|
machine = get_inxi_key(self.inxi, 'Machine')
|
|
|
|
for m in machine:
|
|
|
|
system = get_inxi(m, "System")
|
|
|
|
if system:
|
|
|
|
self.device_manufacturer = system
|
|
|
|
self.device_model = get_inxi(m, "product")
|
|
|
|
self.device_serial_number = get_inxi(m, "serial")
|
|
|
|
self.device_chassis = get_inxi(m, "Type")
|
|
|
|
self.device_version = get_inxi(m, "v")
|
2024-11-15 11:47:08 +00:00
|
|
|
except Exception:
|
|
|
|
return
|
2024-09-18 16:01:46 +00:00
|
|
|
|
2024-07-18 15:21:22 +00:00
|
|
|
def get_time(self):
|
|
|
|
if not self.doc:
|
|
|
|
self.get_doc()
|
|
|
|
self.created = self.doc.get("endTime")
|
|
|
|
|
|
|
|
if not self.created:
|
|
|
|
self.created = self.annotations.last().created
|
|
|
|
|
2024-09-25 10:51:08 +00:00
|
|
|
def get_components(self):
|
2024-10-23 11:39:16 +00:00
|
|
|
if self.is_legacy():
|
2024-09-25 10:51:08 +00:00
|
|
|
return self.doc.get('components', [])
|
|
|
|
self.set_components()
|
|
|
|
return self.components
|
2024-07-01 10:17:23 +00:00
|
|
|
|
2024-09-18 16:01:46 +00:00
|
|
|
def get_manufacturer(self):
|
2024-11-06 16:21:48 +00:00
|
|
|
if self.is_web_snapshot():
|
2024-09-26 11:09:55 +00:00
|
|
|
kv = self.doc.get('kv', {})
|
|
|
|
if len(kv) < 1:
|
|
|
|
return ""
|
|
|
|
return list(self.doc.get('kv').values())[0]
|
|
|
|
|
2024-10-23 11:39:16 +00:00
|
|
|
if self.is_legacy():
|
2024-09-18 16:01:46 +00:00
|
|
|
return self.doc['device']['manufacturer']
|
2024-09-25 10:51:08 +00:00
|
|
|
|
2024-11-15 11:47:08 +00:00
|
|
|
if self.inxi:
|
|
|
|
return self.device_manufacturer
|
2024-12-05 18:23:53 +00:00
|
|
|
|
2024-09-18 16:01:46 +00:00
|
|
|
return self.dmi.manufacturer().strip()
|
2024-09-25 10:51:08 +00:00
|
|
|
|
2024-09-18 16:01:46 +00:00
|
|
|
def get_model(self):
|
2024-11-06 16:21:48 +00:00
|
|
|
if self.is_web_snapshot():
|
2024-09-26 11:09:55 +00:00
|
|
|
kv = self.doc.get('kv', {})
|
|
|
|
if len(kv) < 2:
|
|
|
|
return ""
|
|
|
|
return list(self.doc.get('kv').values())[1]
|
|
|
|
|
2024-10-23 11:39:16 +00:00
|
|
|
if self.is_legacy():
|
2024-09-18 16:01:46 +00:00
|
|
|
return self.doc['device']['model']
|
2024-09-25 10:51:08 +00:00
|
|
|
|
2024-11-15 11:47:08 +00:00
|
|
|
if self.inxi:
|
|
|
|
return self.device_model
|
2024-12-05 18:23:53 +00:00
|
|
|
|
2024-09-18 16:01:46 +00:00
|
|
|
return self.dmi.model().strip()
|
|
|
|
|
|
|
|
def get_chassis(self):
|
2024-10-23 11:39:16 +00:00
|
|
|
if self.is_legacy():
|
2024-09-18 16:01:46 +00:00
|
|
|
return self.doc['device']['model']
|
2024-12-05 18:23:53 +00:00
|
|
|
|
2024-11-15 11:47:08 +00:00
|
|
|
if self.inxi:
|
|
|
|
return self.device_chassis
|
2024-09-25 10:51:08 +00:00
|
|
|
|
|
|
|
chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual')
|
2024-09-18 16:01:46 +00:00
|
|
|
lower_type = chassis.lower()
|
2024-09-25 10:51:08 +00:00
|
|
|
|
2024-09-18 16:01:46 +00:00
|
|
|
for k, v in CHASSIS_DH.items():
|
|
|
|
if lower_type in v:
|
|
|
|
return k
|
|
|
|
return ""
|
|
|
|
|
2024-11-04 07:21:01 +00:00
|
|
|
def get_serial_number(self):
|
|
|
|
if self.is_legacy():
|
|
|
|
return self.doc['device']['serialNumber']
|
2024-12-05 18:23:53 +00:00
|
|
|
|
2024-11-15 11:47:08 +00:00
|
|
|
if self.inxi:
|
|
|
|
return self.device_serial_number
|
|
|
|
|
2024-11-04 07:21:01 +00:00
|
|
|
return self.dmi.serial_number().strip()
|
|
|
|
|
2024-11-15 11:47:08 +00:00
|
|
|
def get_version(self):
|
|
|
|
if self.inxi:
|
|
|
|
return self.device_version
|
|
|
|
|
|
|
|
return ""
|
|
|
|
|
2024-07-31 11:28:46 +00:00
|
|
|
@classmethod
|
|
|
|
def get_all(cls, user):
|
|
|
|
return Annotation.objects.filter(
|
2024-09-18 16:01:46 +00:00
|
|
|
owner=user.institution,
|
2024-07-31 11:28:46 +00:00
|
|
|
type=Annotation.Type.SYSTEM,
|
2024-10-16 12:17:35 +00:00
|
|
|
key="hidalgo1",
|
2024-10-21 11:28:20 +00:00
|
|
|
).order_by("-created").values_list("uuid", "created").distinct()
|
2024-09-25 10:51:08 +00:00
|
|
|
|
|
|
|
def set_components(self):
|
2024-12-20 16:04:09 +00:00
|
|
|
parse = ParseSnapshot
|
|
|
|
if self.doc.get("software") == "workbench-script":
|
|
|
|
if self.doc.get("data", {}).get("lshw"):
|
|
|
|
parse = legacy_ParseSnapshot
|
|
|
|
|
|
|
|
snapshot = parse(self.doc).snapshot_json
|
2024-09-25 10:51:08 +00:00
|
|
|
self.components = snapshot['components']
|
2024-10-23 11:39:16 +00:00
|
|
|
|
|
|
|
def is_legacy(self):
|
2024-12-05 18:23:53 +00:00
|
|
|
if self.doc.get("credentialSubject"):
|
|
|
|
return False
|
|
|
|
|
2024-10-23 11:39:16 +00:00
|
|
|
return self.doc.get("software") != "workbench-script"
|
2024-11-04 07:21:01 +00:00
|
|
|
|
2024-11-06 16:21:48 +00:00
|
|
|
def is_web_snapshot(self):
|
2024-11-04 07:21:01 +00:00
|
|
|
return self.doc.get("type") == "WebSnapshot"
|