fix annotations in parse

This commit is contained in:
Cayo Puigdefabregas 2025-01-23 13:05:09 +01:00
parent d8b1a632e6
commit ca6d98368d
2 changed files with 16 additions and 172 deletions

View file

@ -72,6 +72,7 @@ class UserTagForm(forms.Form):
self.pk = None
self.uuid = kwargs.pop('uuid', None)
self.user = kwargs.pop('user')
instance = SystemProperty.objects.filter(
uuid=self.uuid,
key='CUSTOM_ID',
@ -89,6 +90,7 @@ class UserTagForm(forms.Form):
if not data:
return False
self.tag = data
self.instance = SystemProperty.objects.filter(
uuid=self.uuid,
key='CUSTOM_ID',
@ -104,15 +106,18 @@ class UserTagForm(forms.Form):
if self.instance:
old_value = self.instance.value
if not self.tag:
message =_("<Deleted> Evidence Tag. Old Value: '{}'").format(old_value)
message = _("<Deleted> Evidence Tag. Old Value: '{}'").format(
old_value
)
self.instance.delete()
else:
self.instance.value = self.tag
self.instance.save()
if old_value != self.tag:
message=_("<Updated> Evidence Tag. Old Value: '{}'. New Value: '{}'").format(old_value, self.tag)
msg = "<Updated> Evidence Tag. Old Value: '{}'. New Value: '{}'"
message=_(msg).format(old_value, self.tag)
else:
message =_("<Created> Evidence Tag. Value: '{}'").format(self.tag)
message = _("<Created> Evidence Tag. Value: '{}'").format(self.tag)
SystemProperty.objects.create(
uuid=self.uuid,
key='CUSTOM_ID',
@ -120,7 +125,7 @@ class UserTagForm(forms.Form):
owner=self.user.institution,
user=self.user
)
DeviceLog.objects.create(
snapshot_uuid=self.uuid,
event= message,
@ -129,6 +134,7 @@ class UserTagForm(forms.Form):
)
class ImportForm(forms.Form):
file_import = forms.FileField(label=_("File to import"))
@ -177,8 +183,8 @@ class ImportForm(forms.Form):
table = []
for row in self.rows:
doc = create_doc(row)
property = create_property(doc, self.user)
table.append((doc, property))
prop = create_property(doc, self.user)
table.append((doc, prop))
if commit:
for doc, cred in table:

View file

@ -65,23 +65,21 @@ class Build:
index(self.user.institution, self.uuid, snap)
def create_annotations(self):
annotation = Annotation.objects.filter(
prop = SystemProperty.objects.filter(
uuid=self.uuid,
owner=self.user.institution,
type=Annotation.Type.SYSTEM,
)
if annotation:
txt = "Warning: Snapshot %s already registered (annotation exists)"
if prop:
txt = "Warning: Snapshot %s already registered (property exists)"
logger.warning(txt, self.uuid)
return
for k, v in self.build.algorithms.items():
Annotation.objects.create(
SystemProperty.objects.create(
uuid=self.uuid,
owner=self.user.institution,
user=self.user,
type=Annotation.Type.SYSTEM,
key=k,
value=self.sign(v)
)
@ -95,163 +93,3 @@ class Build:
phid = self.sign(json.dumps(self.build.get_doc()))
register_device_dlt(chid, phid, self.uuid, self.user)
register_passport_dlt(chid, phid, self.uuid, self.user)
class Build2:
def __init__(self, evidence_json, user, check=False):
if evidence_json.get("data",{}).get("lshw"):
if evidence_json.get("software") == "workbench-script":
return legacy_parse.Build(evidence_json, user, check=check)
self.evidence = evidence_json.copy()
self.json = evidence_json.copy()
if evidence_json.get("credentialSubject"):
self.json.update(evidence_json["credentialSubject"])
if evidence_json.get("evidence"):
self.json["data"] = {}
for ev in evidence_json["evidence"]:
k = ev.get("operation")
if not k:
continue
self.json["data"][k] = ev.get("output")
self.uuid = self.json['uuid']
self.user = user
self.hid = None
self.chid = None
self.phid = self.get_signature(self.json)
self.generate_chids()
if check:
return
self.index()
self.create_properties()
if settings.DPP:
self.register_device_dlt()
def index(self):
snap = json.dumps(self.evidence)
index(self.user.institution, self.uuid, snap)
def generate_chids(self):
self.algorithms = {
'hidalgo1': self.get_hid_14(),
'legacy_dpp': self.get_chid_dpp(),
}
def get_hid_14(self):
if self.json.get("software") == "workbench-script":
hid = self.get_hid(self.json)
else:
device = self.json['device']
manufacturer = device.get("manufacturer", '')
model = device.get("model", '')
chassis = device.get("chassis", '')
serial_number = device.get("serialNumber", '')
sku = device.get("sku", '')
hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}"
self.chid = hashlib.sha3_256(hid.encode()).hexdigest()
return self.chid
def get_chid_dpp(self):
if self.json.get("software") == "workbench-script":
device = ParseSnapshot(self.json).device
else:
device = self.json['device']
hid = self.get_id_hw_dpp(device)
self.chid = hashlib.sha3_256(hid.encode("utf-8")).hexdigest()
return self.chid
def get_id_hw_dpp(self, d):
manufacturer = d.get("manufacturer", '')
model = d.get("model", '')
chassis = d.get("chassis", '')
serial_number = d.get("serialNumber", '')
sku = d.get("sku", '')
typ = d.get("type", '')
version = d.get("version", '')
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{typ}{version}"
def get_phid(self):
if self.json.get("software") == "workbench-script":
data = ParseSnapshot(self.json)
self.device = data.device
self.components = data.components
else:
self.device = self.json.get("device")
self.components = self.json.get("components", [])
self.device.pop("actions", None)
for c in self.components:
c.pop("actions", None)
device = self.get_id_hw_dpp(self.device)
components = sorted(self.components, key=lambda x: x.get("type"))
doc = [("computer", device)]
for c in components:
doc.append((c.get("type"), self.get_id_hw_dpp(c)))
return doc
def create_properties(self):
property = SystemProperty.objects.filter(
uuid=self.uuid,
owner=self.user.institution,
)
if property:
txt = "Warning: Snapshot %s already registered (property exists)"
logger.warning(txt, self.uuid)
return
for k, v in self.algorithms.items():
SystemProperty.objects.create(
uuid=self.uuid,
owner=self.user.institution,
user=self.user,
key=k,
value=v
)
def get_hid(self, snapshot):
try:
self.inxi = self.json["data"]["inxi"]
if isinstance(self.inxi, str):
self.inxi = json.loads(self.inxi)
except Exception:
logger.error("No inxi in snapshot %s", self.uuid)
return ""
machine = get_inxi_key(self.inxi, 'Machine')
for m in machine:
system = get_inxi(m, "System")
if system:
manufacturer = system
model = get_inxi(m, "product")
serial_number = get_inxi(m, "serial")
chassis = get_inxi(m, "Type")
else:
sku = get_inxi(m, "part-nu")
mac = get_mac(self.inxi) or ""
if not mac:
txt = "Could not retrieve MAC address in snapshot %s"
logger.warning(txt, snapshot['uuid'])
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}"
def get_signature(self, doc):
return hashlib.sha3_256(json.dumps(doc).encode()).hexdigest()
def register_device_dlt(self):
chid = self.algorithms.get('legacy_dpp')
phid = self.get_signature(self.get_phid())
register_device_dlt(chid, phid, self.uuid, self.user)
register_passport_dlt(chid, phid, self.uuid, self.user)