2024-07-11 15:40:45 +00:00
|
|
|
import json
|
|
|
|
import hashlib
|
2024-10-25 15:36:13 +00:00
|
|
|
import logging
|
2024-07-01 10:17:23 +00:00
|
|
|
|
2024-09-18 16:01:46 +00:00
|
|
|
from dmidecode import DMIParse
|
2024-10-21 16:39:31 +00:00
|
|
|
from json_repair import repair_json
|
2024-11-28 11:21:26 +00:00
|
|
|
from evidence.parse_details import get_lshw_child, ParseSnapshot
|
2024-10-21 16:39:31 +00:00
|
|
|
|
2024-09-25 10:51:08 +00:00
|
|
|
from evidence.models import Annotation
|
2024-10-04 15:32:53 +00:00
|
|
|
from evidence.xapian import index
|
2024-11-18 18:37:08 +00:00
|
|
|
from dpp.api_dlt import register_device_dlt, register_passport_dlt
|
2024-10-25 15:36:13 +00:00
|
|
|
from utils.constants import CHASSIS_DH
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger('django')
|
2024-07-11 15:40:45 +00:00
|
|
|
|
2024-11-18 18:37:08 +00:00
|
|
|
|
2024-11-09 19:52:26 +00:00
|
|
|
def get_mac(lshw):
|
|
|
|
try:
|
|
|
|
if type(lshw) is dict:
|
|
|
|
hw = lshw
|
|
|
|
else:
|
|
|
|
hw = json.loads(lshw)
|
|
|
|
except json.decoder.JSONDecodeError:
|
|
|
|
hw = json.loads(repair_json(lshw))
|
|
|
|
|
2024-11-11 06:41:08 +00:00
|
|
|
nets = []
|
|
|
|
get_lshw_child(hw, nets, 'network')
|
|
|
|
|
|
|
|
nets_sorted = sorted(nets, key=lambda x: x['businfo'])
|
2024-11-09 19:52:26 +00:00
|
|
|
|
|
|
|
if nets_sorted:
|
|
|
|
mac = nets_sorted[0]['serial']
|
|
|
|
logger.debug("The snapshot has the following MAC: %s" , mac)
|
|
|
|
return mac
|
|
|
|
|
2024-07-11 15:40:45 +00:00
|
|
|
|
2024-09-20 16:05:29 +00:00
|
|
|
|
2024-07-11 15:40:45 +00:00
|
|
|
class Build:
|
2024-07-31 11:28:46 +00:00
|
|
|
def __init__(self, evidence_json, user, check=False):
|
2024-07-26 15:59:34 +00:00
|
|
|
self.json = evidence_json
|
2024-07-18 15:21:22 +00:00
|
|
|
self.uuid = self.json['uuid']
|
2024-07-11 15:40:45 +00:00
|
|
|
self.user = user
|
|
|
|
self.hid = None
|
2024-11-18 18:37:08 +00:00
|
|
|
self.chid = None
|
|
|
|
self.phid = self.get_signature(self.json)
|
2024-07-31 11:28:46 +00:00
|
|
|
self.generate_chids()
|
|
|
|
|
|
|
|
if check:
|
|
|
|
return
|
2024-07-11 15:40:45 +00:00
|
|
|
|
2024-07-15 14:23:14 +00:00
|
|
|
self.index()
|
2024-07-18 15:21:22 +00:00
|
|
|
self.create_annotations()
|
2024-11-18 18:37:08 +00:00
|
|
|
self.register_device_dlt()
|
2024-07-11 15:40:45 +00:00
|
|
|
|
2024-07-15 14:23:14 +00:00
|
|
|
def index(self):
|
|
|
|
snap = json.dumps(self.json)
|
2024-10-04 15:32:53 +00:00
|
|
|
index(self.user.institution, self.uuid, snap)
|
2024-07-15 14:23:14 +00:00
|
|
|
|
2024-07-31 11:28:46 +00:00
|
|
|
def generate_chids(self):
|
|
|
|
self.algorithms = {
|
|
|
|
'hidalgo1': self.get_hid_14(),
|
2024-11-28 11:21:26 +00:00
|
|
|
'legacy_dpp': self.get_chid_dpp(),
|
2024-07-31 11:28:46 +00:00
|
|
|
}
|
|
|
|
|
2024-07-15 14:23:14 +00:00
|
|
|
def get_hid_14(self):
|
2024-10-21 09:24:09 +00:00
|
|
|
if self.json.get("software") == "workbench-script":
|
2024-09-18 16:01:46 +00:00
|
|
|
hid = self.get_hid(self.json)
|
|
|
|
else:
|
|
|
|
device = self.json['device']
|
|
|
|
manufacturer = device.get("manufacturer", '')
|
|
|
|
model = device.get("model", '')
|
|
|
|
chassis = device.get("chassis", '')
|
|
|
|
serial_number = device.get("serialNumber", '')
|
|
|
|
sku = device.get("sku", '')
|
|
|
|
hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}"
|
2024-10-25 15:36:13 +00:00
|
|
|
|
|
|
|
|
2024-11-18 18:37:08 +00:00
|
|
|
self.chid = hashlib.sha3_256(hid.encode()).hexdigest()
|
|
|
|
return self.chid
|
2024-07-15 14:23:14 +00:00
|
|
|
|
2024-11-28 11:21:26 +00:00
|
|
|
def get_chid_dpp(self):
|
|
|
|
if self.json.get("software") == "workbench-script":
|
|
|
|
dmidecode_raw = self.json["data"]["dmidecode"]
|
|
|
|
dmi = DMIParse(dmidecode_raw)
|
|
|
|
|
|
|
|
manufacturer = dmi.manufacturer().strip()
|
|
|
|
model = dmi.model().strip()
|
|
|
|
chassis = self.get_chassis_dh()
|
|
|
|
serial_number = dmi.serial_number()
|
|
|
|
sku = self.get_sku()
|
|
|
|
typ = chassis
|
|
|
|
version = self.get_version()
|
|
|
|
hid = f"{manufacturer}{model}{chassis}{serial_number}{sku}{typ}{version}"
|
|
|
|
else:
|
|
|
|
device = self.json['device']
|
|
|
|
hid = self.get_id_hw_dpp(device)
|
|
|
|
|
|
|
|
self.chid = hashlib.sha3_256(hid.encode("utf-8")).hexdigest()
|
|
|
|
return self.chid
|
|
|
|
|
|
|
|
def get_id_hw_dpp(self, d):
|
|
|
|
manufacturer = d.get("manufacturer", '')
|
|
|
|
model = d.get("model", '')
|
|
|
|
chassis = d.get("chassis", '')
|
|
|
|
serial_number = d.get("serialNumber", '')
|
|
|
|
sku = d.get("sku", '')
|
|
|
|
typ = d.get("type", '')
|
|
|
|
version = d.get("version", '')
|
|
|
|
|
|
|
|
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{typ}{version}"
|
|
|
|
|
|
|
|
def get_phid(self):
|
|
|
|
if self.json.get("software") == "workbench-script":
|
|
|
|
data = ParseSnapshot(self.json)
|
|
|
|
self.device = data.device
|
|
|
|
self.components = data.components
|
|
|
|
else:
|
|
|
|
self.device = self.json.get("device")
|
2024-11-28 14:25:57 +00:00
|
|
|
self.components = self.json.get("components", [])
|
2024-11-28 11:21:26 +00:00
|
|
|
|
|
|
|
device = self.get_id_hw_dpp(self.device)
|
|
|
|
components = self.json.get(self.components)
|
|
|
|
components = sorted(components, key=lambda x: x.get("type"))
|
|
|
|
doc = [("computer", device)]
|
|
|
|
|
|
|
|
for c in components:
|
|
|
|
doc.append((c.get("type"), self.get_id_hw_dpp(c)))
|
|
|
|
|
|
|
|
return doc
|
|
|
|
|
2024-07-18 15:21:22 +00:00
|
|
|
def create_annotations(self):
|
2024-10-25 15:36:13 +00:00
|
|
|
annotation = Annotation.objects.filter(
|
|
|
|
uuid=self.uuid,
|
|
|
|
owner=self.user.institution,
|
|
|
|
type=Annotation.Type.SYSTEM,
|
|
|
|
)
|
|
|
|
|
|
|
|
if annotation:
|
2024-10-31 13:24:16 +00:00
|
|
|
txt = "Warning: Snapshot %s already registered (annotation exists)"
|
2024-10-31 09:14:02 +00:00
|
|
|
logger.warning(txt, self.uuid)
|
2024-10-25 15:36:13 +00:00
|
|
|
return
|
2024-07-18 15:21:22 +00:00
|
|
|
|
2024-07-31 11:28:46 +00:00
|
|
|
for k, v in self.algorithms.items():
|
2024-07-18 15:21:22 +00:00
|
|
|
Annotation.objects.create(
|
|
|
|
uuid=self.uuid,
|
2024-09-18 16:01:46 +00:00
|
|
|
owner=self.user.institution,
|
2024-10-04 15:32:53 +00:00
|
|
|
user=self.user,
|
2024-07-18 15:21:22 +00:00
|
|
|
type=Annotation.Type.SYSTEM,
|
|
|
|
key=k,
|
|
|
|
value=v
|
|
|
|
)
|
2024-09-18 16:01:46 +00:00
|
|
|
|
|
|
|
def get_chassis_dh(self):
|
|
|
|
chassis = self.get_chassis()
|
|
|
|
lower_type = chassis.lower()
|
|
|
|
for k, v in CHASSIS_DH.items():
|
|
|
|
if lower_type in v:
|
|
|
|
return k
|
|
|
|
return self.default
|
|
|
|
|
|
|
|
def get_sku(self):
|
|
|
|
return self.dmi.get("System")[0].get("SKU Number", "n/a").strip()
|
2024-10-25 15:36:13 +00:00
|
|
|
|
2024-09-18 16:01:46 +00:00
|
|
|
def get_chassis(self):
|
2024-11-28 14:25:57 +00:00
|
|
|
return self.dmi.get("Chassis")[0].get("Type", '_virtual') #
|
2024-09-18 16:01:46 +00:00
|
|
|
|
2024-11-28 11:21:26 +00:00
|
|
|
def get_version(self):
|
|
|
|
return self.dmi.get("System")[0].get("Verson", '_virtual')
|
|
|
|
|
2024-09-18 16:01:46 +00:00
|
|
|
def get_hid(self, snapshot):
|
|
|
|
dmidecode_raw = snapshot["data"]["dmidecode"]
|
|
|
|
self.dmi = DMIParse(dmidecode_raw)
|
|
|
|
|
|
|
|
manufacturer = self.dmi.manufacturer().strip()
|
|
|
|
model = self.dmi.model().strip()
|
|
|
|
chassis = self.get_chassis_dh()
|
|
|
|
serial_number = self.dmi.serial_number()
|
|
|
|
sku = self.get_sku()
|
2024-09-23 12:14:30 +00:00
|
|
|
|
|
|
|
if not snapshot["data"].get('lshw'):
|
|
|
|
return f"{manufacturer}{model}{chassis}{serial_number}{sku}"
|
2024-10-25 15:36:13 +00:00
|
|
|
|
2024-09-23 12:14:30 +00:00
|
|
|
lshw = snapshot["data"]["lshw"]
|
|
|
|
# mac = get_mac2(hwinfo_raw) or ""
|
|
|
|
mac = get_mac(lshw) or ""
|
2024-09-20 16:22:27 +00:00
|
|
|
if not mac:
|
2024-10-31 09:40:53 +00:00
|
|
|
txt = "Could not retrieve MAC address in snapshot %s"
|
|
|
|
logger.warning(txt, snapshot['uuid'])
|
2024-09-18 16:01:46 +00:00
|
|
|
|
2024-09-20 16:05:29 +00:00
|
|
|
return f"{manufacturer}{model}{chassis}{serial_number}{sku}{mac}"
|
2024-11-28 11:21:26 +00:00
|
|
|
|
2024-11-18 18:37:08 +00:00
|
|
|
def get_signature(self, doc):
|
|
|
|
return hashlib.sha3_256(json.dumps(doc).encode()).hexdigest()
|
|
|
|
|
|
|
|
def register_device_dlt(self):
|
2024-11-28 11:21:26 +00:00
|
|
|
chid = self.algorithms.get('legacy_dpp')
|
|
|
|
phid = self.get_signature(self.get_phid(self))
|
|
|
|
register_device_dlt(chid, phid, self.uuid, self.user)
|
|
|
|
register_passport_dlt(chid, phid, self.uuid, self.user)
|