From ca6d98368d598877dc59d988f57236b806d4809a Mon Sep 17 00:00:00 2001 From: Cayo Puigdefabregas Date: Thu, 23 Jan 2025 13:05:09 +0100 Subject: [PATCH] fix annotations in parse --- evidence/forms.py | 18 +++-- evidence/parse.py | 170 ++-------------------------------------------- 2 files changed, 16 insertions(+), 172 deletions(-) diff --git a/evidence/forms.py b/evidence/forms.py index b5516ab..ee630ca 100644 --- a/evidence/forms.py +++ b/evidence/forms.py @@ -72,6 +72,7 @@ class UserTagForm(forms.Form): self.pk = None self.uuid = kwargs.pop('uuid', None) self.user = kwargs.pop('user') + instance = SystemProperty.objects.filter( uuid=self.uuid, key='CUSTOM_ID', @@ -89,6 +90,7 @@ class UserTagForm(forms.Form): if not data: return False self.tag = data + self.instance = SystemProperty.objects.filter( uuid=self.uuid, key='CUSTOM_ID', @@ -104,15 +106,18 @@ class UserTagForm(forms.Form): if self.instance: old_value = self.instance.value if not self.tag: - message =_(" Evidence Tag. Old Value: '{}'").format(old_value) + message = _(" Evidence Tag. Old Value: '{}'").format( + old_value + ) self.instance.delete() else: self.instance.value = self.tag self.instance.save() if old_value != self.tag: - message=_(" Evidence Tag. Old Value: '{}'. New Value: '{}'").format(old_value, self.tag) + msg = " Evidence Tag. Old Value: '{}'. New Value: '{}'" + message=_(msg).format(old_value, self.tag) else: - message =_(" Evidence Tag. Value: '{}'").format(self.tag) + message = _(" Evidence Tag. Value: '{}'").format(self.tag) SystemProperty.objects.create( uuid=self.uuid, key='CUSTOM_ID', @@ -120,7 +125,7 @@ class UserTagForm(forms.Form): owner=self.user.institution, user=self.user ) - + DeviceLog.objects.create( snapshot_uuid=self.uuid, event= message, @@ -129,6 +134,7 @@ class UserTagForm(forms.Form): ) + class ImportForm(forms.Form): file_import = forms.FileField(label=_("File to import")) @@ -177,8 +183,8 @@ class ImportForm(forms.Form): table = [] for row in self.rows: doc = create_doc(row) - property = create_property(doc, self.user) - table.append((doc, property)) + prop = create_property(doc, self.user) + table.append((doc, prop)) if commit: for doc, cred in table: diff --git a/evidence/parse.py b/evidence/parse.py index ffb6608..403650b 100644 --- a/evidence/parse.py +++ b/evidence/parse.py @@ -65,23 +65,21 @@ class Build: index(self.user.institution, self.uuid, snap) def create_annotations(self): - annotation = Annotation.objects.filter( + prop = SystemProperty.objects.filter( uuid=self.uuid, owner=self.user.institution, - type=Annotation.Type.SYSTEM, ) - if annotation: - txt = "Warning: Snapshot %s already registered (annotation exists)" + if prop: + txt = "Warning: Snapshot %s already registered (property exists)" logger.warning(txt, self.uuid) return for k, v in self.build.algorithms.items(): - Annotation.objects.create( + SystemProperty.objects.create( uuid=self.uuid, owner=self.user.institution, user=self.user, - type=Annotation.Type.SYSTEM, key=k, value=self.sign(v) ) @@ -95,163 +93,3 @@ class Build: phid = self.sign(json.dumps(self.build.get_doc())) register_device_dlt(chid, phid, self.uuid, self.user) register_passport_dlt(chid, phid, self.uuid, self.user) - - -class Build2: - def __init__(self, evidence_json, user, check=False): - if evidence_json.get("data",{}).get("lshw"): - if evidence_json.get("software") == "workbench-script": - return legacy_parse.Build(evidence_json, user, check=check) - - self.evidence = evidence_json.copy() - self.json = evidence_json.copy() - - if evidence_json.get("credentialSubject"): - self.json.update(evidence_json["credentialSubject"]) - if evidence_json.get("evidence"): - self.json["data"] = {} - for ev in evidence_json["evidence"]: - k = ev.get("operation") - if not k: - continue - self.json["data"][k] = ev.get("output") - - self.uuid = self.json['uuid'] - self.user = user - self.hid = None - self.chid = None - self.phid = self.get_signature(self.json) - self.generate_chids() - - if check: - return - - self.index() - self.create_properties() - if settings.DPP: - self.register_device_dlt() - - def index(self): - snap = json.dumps(self.evidence) - index(self.user.institution, self.uuid, snap) - - def generate_chids(self): - self.algorithms = { - 'hidalgo1': self.get_hid_14(), - 'legacy_dpp': self.get_chid_dpp(), - } - - def get_hid_14(self): - if self.json.get("software") == "workbench-script": - hid = self.get_hid(self.json) - else: - device = self.json['device'] - manufacturer = device.get("manufacturer", '') - model = device.get("model", '') - chassis = device.get("chassis", '') - serial_number = device.get("serialNumber", '') - sku = device.get("sku", '') - hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}" - - self.chid = hashlib.sha3_256(hid.encode()).hexdigest() - return self.chid - - def get_chid_dpp(self): - if self.json.get("software") == "workbench-script": - device = ParseSnapshot(self.json).device - else: - device = self.json['device'] - - hid = self.get_id_hw_dpp(device) - self.chid = hashlib.sha3_256(hid.encode("utf-8")).hexdigest() - return self.chid - - def get_id_hw_dpp(self, d): - manufacturer = d.get("manufacturer", '') - model = d.get("model", '') - chassis = d.get("chassis", '') - serial_number = d.get("serialNumber", '') - sku = d.get("sku", '') - typ = d.get("type", '') - version = d.get("version", '') - - return f"{manufacturer}{model}{chassis}{serial_number}{sku}{typ}{version}" - - def get_phid(self): - if self.json.get("software") == "workbench-script": - data = ParseSnapshot(self.json) - self.device = data.device - self.components = data.components - else: - self.device = self.json.get("device") - self.components = self.json.get("components", []) - - self.device.pop("actions", None) - for c in self.components: - c.pop("actions", None) - - device = self.get_id_hw_dpp(self.device) - components = sorted(self.components, key=lambda x: x.get("type")) - doc = [("computer", device)] - - for c in components: - doc.append((c.get("type"), self.get_id_hw_dpp(c))) - - return doc - - def create_properties(self): - property = SystemProperty.objects.filter( - uuid=self.uuid, - owner=self.user.institution, - ) - - if property: - txt = "Warning: Snapshot %s already registered (property exists)" - logger.warning(txt, self.uuid) - return - - for k, v in self.algorithms.items(): - SystemProperty.objects.create( - uuid=self.uuid, - owner=self.user.institution, - user=self.user, - key=k, - value=v - ) - - def get_hid(self, snapshot): - try: - self.inxi = self.json["data"]["inxi"] - if isinstance(self.inxi, str): - self.inxi = json.loads(self.inxi) - except Exception: - logger.error("No inxi in snapshot %s", self.uuid) - return "" - - machine = get_inxi_key(self.inxi, 'Machine') - for m in machine: - system = get_inxi(m, "System") - if system: - manufacturer = system - model = get_inxi(m, "product") - serial_number = get_inxi(m, "serial") - chassis = get_inxi(m, "Type") - else: - sku = get_inxi(m, "part-nu") - - mac = get_mac(self.inxi) or "" - if not mac: - txt = "Could not retrieve MAC address in snapshot %s" - logger.warning(txt, snapshot['uuid']) - return f"{manufacturer}{model}{chassis}{serial_number}{sku}" - - return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}" - - def get_signature(self, doc): - return hashlib.sha3_256(json.dumps(doc).encode()).hexdigest() - - def register_device_dlt(self): - chid = self.algorithms.get('legacy_dpp') - phid = self.get_signature(self.get_phid()) - register_device_dlt(chid, phid, self.uuid, self.user) - register_passport_dlt(chid, phid, self.uuid, self.user)