devicehub-django/evidence/parse.py

132 lines
4.1 KiB
Python

import json
import hashlib
import logging
from evidence.models import Annotation
from evidence.xapian import index
from dpp.api_dlt import register_device_dlt, register_passport_dlt
from evidence.parse_details import get_inxi_key, get_inxi
logger = logging.getLogger('django')
def get_mac(inxi):
nets = get_inxi_key(inxi, "Network")
networks = [(nets[i], nets[i + 1]) for i in range(0, len(nets) - 1, 2)]
for n, iface in networks:
if get_inxi(n, "port"):
return get_inxi(iface, 'mac')
class Build:
def __init__(self, evidence_json, user, check=False):
self.evidence = evidence_json.copy()
self.json = evidence_json.copy()
if evidence_json.get("credentialSubject"):
self.json.update(evidence_json["credentialSubject"])
if evidence_json.get("evidence"):
self.json["data"] = {}
for ev in evidence_json["evidence"]:
k = ev.get("operation")
if not k:
continue
self.json["data"][k] = ev.get("output")
self.uuid = self.json['uuid']
self.user = user
self.hid = None
self.chid = None
self.phid = self.get_signature(self.json)
self.generate_chids()
if check:
return
self.index()
self.create_annotations()
self.register_device_dlt()
def index(self):
snap = json.dumps(self.evidence)
index(self.user.institution, self.uuid, snap)
def generate_chids(self):
self.algorithms = {
'hidalgo1': self.get_hid_14(),
}
def get_hid_14(self):
if self.json.get("software") == "workbench-script":
hid = self.get_hid(self.json)
else:
device = self.json['device']
manufacturer = device.get("manufacturer", '')
model = device.get("model", '')
chassis = device.get("chassis", '')
serial_number = device.get("serialNumber", '')
sku = device.get("sku", '')
hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}"
self.chid = hashlib.sha3_256(hid.encode()).hexdigest()
return self.chid
def create_annotations(self):
annotation = Annotation.objects.filter(
uuid=self.uuid,
owner=self.user.institution,
type=Annotation.Type.SYSTEM,
)
if annotation:
txt = "Warning: Snapshot %s already registered (annotation exists)"
logger.warning(txt, self.uuid)
return
for k, v in self.algorithms.items():
Annotation.objects.create(
uuid=self.uuid,
owner=self.user.institution,
user=self.user,
type=Annotation.Type.SYSTEM,
key=k,
value=v
)
def get_hid(self, snapshot):
try:
self.inxi = self.json["data"]["inxi"]
if isinstance(self.inxi, str):
self.inxi = json.loads(self.inxi)
except Exception:
logger.error("No inxi in snapshot %s", self.uuid)
return ""
machine = get_inxi_key(self.inxi, 'Machine')
for m in machine:
system = get_inxi(m, "System")
if system:
manufacturer = system
model = get_inxi(m, "product")
serial_number = get_inxi(m, "serial")
chassis = get_inxi(m, "Type")
else:
sku = get_inxi(m, "part-nu")
mac = get_mac(self.inxi) or ""
if not mac:
txt = "Could not retrieve MAC address in snapshot %s"
logger.warning(txt, snapshot['uuid'])
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}"
def get_signature(self, doc):
return hashlib.sha3_256(json.dumps(doc).encode()).hexdigest()
def register_device_dlt(self):
register_device_dlt(self.chid, self.phid, self.uuid, self.user)
register_passport_dlt(self.chid, self.phid, self.uuid, self.user)