From 7de6d69a6c1bf9444c38b0688df4e20472784ff2 Mon Sep 17 00:00:00 2001 From: Cayo Puigdefabregas Date: Thu, 5 Dec 2024 19:23:53 +0100 Subject: [PATCH] fix parsing with credentials --- dashboard/views.py | 7 ++++-- evidence/models.py | 53 +++++++++++++++++++++++++-------------- evidence/parse.py | 5 +++- evidence/parse_details.py | 25 ++++++++++++------ 4 files changed, 60 insertions(+), 30 deletions(-) diff --git a/dashboard/views.py b/dashboard/views.py index 8d91732..2ec12a0 100644 --- a/dashboard/views.py +++ b/dashboard/views.py @@ -84,8 +84,11 @@ class SearchView(InventaryMixin): return devices, count def get_annotations(self, xp): - snap = xp.document.get_data() - uuid = json.loads(snap).get('uuid') + snap = json.loads(xp.document.get_data()) + if snap.get("credentialSubject"): + uuid = snap["credentialSubject"]["uuid"] + else: + uuid = snap["uuid"] return Device.get_annotation_from_uuid(uuid, self.request.user.institution) def search_hids(self, query, offset, limit): diff --git a/evidence/models.py b/evidence/models.py index 242d885..286e4d6 100644 --- a/evidence/models.py +++ b/evidence/models.py @@ -71,25 +71,37 @@ class Evidence: for xa in matches: self.doc = json.loads(xa.document.get_data()) - if not self.is_legacy(): - dmidecode_raw = self.doc["data"]["dmidecode"] - inxi_raw = self.doc["data"]["inxi"] - self.dmi = DMIParse(dmidecode_raw) - try: - self.inxi = json.loads(inxi_raw) - machine = get_inxi_key(self.inxi, 'Machine') - for m in machine: - system = get_inxi(m, "System") - if system: - self.device_manufacturer = system - self.device_model = get_inxi(m, "product") - self.device_serial_number = get_inxi(m, "serial") - self.device_chassis = get_inxi(m, "Type") - self.device_version = get_inxi(m, "v") + if self.is_legacy(): + return + if self.doc.get("credentialSubject"): + for ev in self.doc["evidence"]: + if "dmidecode" == ev.get("operation"): + dmidecode_raw = ev["output"] + if "inxi" == ev.get("operation"): + self.inxi = ev["output"] + else: + dmidecode_raw = self.doc["data"]["dmidecode"] + try: + self.inxi = json.loads(self.doc["data"]["inxi"]) except Exception: return + self.dmi = DMIParse(dmidecode_raw) + try: + machine = get_inxi_key(self.inxi, 'Machine') + for m in machine: + system = get_inxi(m, "System") + if system: + self.device_manufacturer = system + self.device_model = get_inxi(m, "product") + self.device_serial_number = get_inxi(m, "serial") + self.device_chassis = get_inxi(m, "Type") + self.device_version = get_inxi(m, "v") + + except Exception: + return + def get_time(self): if not self.doc: self.get_doc() @@ -116,7 +128,7 @@ class Evidence: if self.inxi: return self.device_manufacturer - + return self.dmi.manufacturer().strip() def get_model(self): @@ -131,13 +143,13 @@ class Evidence: if self.inxi: return self.device_model - + return self.dmi.model().strip() def get_chassis(self): if self.is_legacy(): return self.doc['device']['model'] - + if self.inxi: return self.device_chassis @@ -152,7 +164,7 @@ class Evidence: def get_serial_number(self): if self.is_legacy(): return self.doc['device']['serialNumber'] - + if self.inxi: return self.device_serial_number @@ -178,6 +190,9 @@ class Evidence: self.components = snapshot['components'] def is_legacy(self): + if self.doc.get("credentialSubject"): + return False + return self.doc.get("software") != "workbench-script" def is_web_snapshot(self): diff --git a/evidence/parse.py b/evidence/parse.py index 9a8ec2a..304d1d3 100644 --- a/evidence/parse.py +++ b/evidence/parse.py @@ -25,6 +25,7 @@ class Build: def __init__(self, evidence_json, user, check=False): self.evidence = evidence_json.copy() self.json = evidence_json.copy() + if evidence_json.get("credentialSubject"): self.json.update(evidence_json["credentialSubject"]) if evidence_json.get("evidence"): @@ -94,7 +95,9 @@ class Build: def get_hid(self, snapshot): try: - self.inxi = json.loads(self.json["data"]["inxi"]) + self.inxi = self.json["data"]["inxi"] + if isinstance(self.inxi, str): + self.inxi = json.loads(self.inxi) except Exception: logger.error("No inxi in snapshot %s", self.uuid) return "" diff --git a/evidence/parse_details.py b/evidence/parse_details.py index 35adfc5..7ce0a5b 100644 --- a/evidence/parse_details.py +++ b/evidence/parse_details.py @@ -30,9 +30,20 @@ def get_inxi(n, name): class ParseSnapshot: def __init__(self, snapshot, default="n/a"): self.default = default - self.dmidecode_raw = snapshot["data"].get("dmidecode", "{}") - self.smart_raw = snapshot["data"].get("disks", []) - self.inxi_raw = snapshot["data"].get("inxi", "") or "" + self.dmidecode_raw = snapshot.get("data", {}).get("dmidecode", "{}") + self.smart_raw = snapshot.get("data", {}).get("smartctl", []) + self.inxi_raw = snapshot.get("data", {}).get("inxi", "") or "" + for ev in snapshot.get("evidence", []): + if "dmidecode" == ev.get("operation"): + self.dmidecode_raw = ev["output"] + if "inxi" == ev.get("operation"): + self.inxi_raw = ev["output"] + if "smartctl" == ev.get("operation"): + self.smart_raw = ev["output"] + data = snapshot + if snapshot.get("credentialSubject"): + data = snapshot["credentialSubject"] + self.device = {"actions": []} self.components = [] @@ -45,11 +56,10 @@ class ParseSnapshot: self.snapshot_json = { "type": "Snapshot", "device": self.device, - "software": snapshot["software"], + "software": data["software"], "components": self.components, - "uuid": snapshot['uuid'], - "version": snapshot['version'], - "endTime": snapshot["timestamp"], + "uuid": data['uuid'], + "endTime": data["timestamp"], "elapsed": 1, } @@ -267,7 +277,6 @@ class ParseSnapshot: hd["read used"] = get_inxi(d, "read-units") hd["written used"] = get_inxi(d, "written-units") - # import pdb; pdb.set_trace() self.components.append(hd) continue