add parsing components with new workbench

This commit is contained in:
Cayo Puigdefabregas 2024-09-25 12:51:08 +02:00
parent bab540187c
commit 54ef0bb41c
11 changed files with 599 additions and 15 deletions

View file

@ -155,3 +155,10 @@ class Device:
self.get_last_evidence() self.get_last_evidence()
return self.last_evidence.get_model() return self.last_evidence.get_model()
@property
def components(self):
if not self.last_evidence:
self.get_last_evidence()
return self.last_evidence.get_components()

View file

@ -173,7 +173,7 @@
<div class="tab-pane fade profile-overview" id="components"> <div class="tab-pane fade profile-overview" id="components">
<h5 class="card-title">Components last evidence</h5> <h5 class="card-title">Components last evidence</h5>
<div class="list-group col-6"> <div class="list-group col-6">
{% for c in object.last_evidence.doc.components %} {% for c in object.components %}
<div class="list-group-item"> <div class="list-group-item">
<div class="d-flex w-100 justify-content-between"> <div class="d-flex w-100 justify-content-between">
<h5 class="mb-1">{{ c.type }}</h5> <h5 class="mb-1">{{ c.type }}</h5>

View file

@ -0,0 +1,25 @@
from pathlib import Path
from pint import UnitRegistry
# Sets up the unit handling
unit_registry = Path(__file__).parent / 'unit_registry'
unit = UnitRegistry()
unit.load_definitions(str(unit_registry / 'quantities.txt'))
TB = unit.TB
GB = unit.GB
MB = unit.MB
Mbs = unit.Mbit / unit.s
MBs = unit.MB / unit.s
Hz = unit.Hz
GHz = unit.GHz
MHz = unit.MHz
Inch = unit.inch
mAh = unit.hour * unit.mA
mV = unit.mV
base2 = UnitRegistry()
base2.load_definitions(str(unit_registry / 'base2.quantities.txt'))
GiB = base2.GiB

View file

@ -5,6 +5,7 @@ from django.db import models
from utils.constants import STR_SM_SIZE, STR_EXTEND_SIZE, CHASSIS_DH from utils.constants import STR_SM_SIZE, STR_EXTEND_SIZE, CHASSIS_DH
from evidence.xapian import search from evidence.xapian import search
from evidence.parse_details import ParseSnapshot
from user.models import Institution from user.models import Institution
@ -35,6 +36,8 @@ class Evidence:
self.created = None self.created = None
self.dmi = None self.dmi = None
self.annotations = [] self.annotations = []
self.components = []
self.default = "n/a"
self.get_owner() self.get_owner()
self.get_time() self.get_time()
@ -60,12 +63,11 @@ class Evidence:
for xa in matches: for xa in matches:
self.doc = json.loads(xa.document.get_data()) self.doc = json.loads(xa.document.get_data())
if self.doc.get("software") == "EreuseWorkbench": if self.doc.get("software") == "EreuseWorkbench":
dmidecode_raw = self.doc["data"]["dmidecode"] dmidecode_raw = self.doc["data"]["dmidecode"]
self.dmi = DMIParse(dmidecode_raw) self.dmi = DMIParse(dmidecode_raw)
def get_time(self): def get_time(self):
if not self.doc: if not self.doc:
self.get_doc() self.get_doc()
@ -74,38 +76,43 @@ class Evidence:
if not self.created: if not self.created:
self.created = self.annotations.last().created self.created = self.annotations.last().created
def components(self): def get_components(self):
return self.doc.get('components', []) if self.doc.get("software") != "EreuseWorkbench":
return self.doc.get('components', [])
self.set_components()
return self.components
def get_manufacturer(self): def get_manufacturer(self):
if self.doc.get("software") != "EreuseWorkbench": if self.doc.get("software") != "EreuseWorkbench":
return self.doc['device']['manufacturer'] return self.doc['device']['manufacturer']
return self.dmi.manufacturer().strip() return self.dmi.manufacturer().strip()
def get_model(self): def get_model(self):
if self.doc.get("software") != "EreuseWorkbench": if self.doc.get("software") != "EreuseWorkbench":
return self.doc['device']['model'] return self.doc['device']['model']
return self.dmi.model().strip() return self.dmi.model().strip()
def get_chassis(self): def get_chassis(self):
if self.doc.get("software") != "EreuseWorkbench": if self.doc.get("software") != "EreuseWorkbench":
return self.doc['device']['model'] return self.doc['device']['model']
chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual') chassis = self.dmi.get("Chassis")[0].get("Type", '_virtual')
lower_type = chassis.lower() lower_type = chassis.lower()
for k, v in CHASSIS_DH.items(): for k, v in CHASSIS_DH.items():
if lower_type in v: if lower_type in v:
return k return k
return "" return ""
@classmethod @classmethod
def get_all(cls, user): def get_all(cls, user):
return Annotation.objects.filter( return Annotation.objects.filter(
owner=user.institution, owner=user.institution,
type=Annotation.Type.SYSTEM, type=Annotation.Type.SYSTEM,
).order_by("-created").values_list("uuid", flat=True).distinct() ).order_by("-created").values_list("uuid", flat=True).distinct()
def set_components(self):
snapshot = ParseSnapshot(self.doc).snapshot_json
self.components = snapshot['components']

View file

@ -6,7 +6,7 @@ import hashlib
from datetime import datetime from datetime import datetime
from dmidecode import DMIParse from dmidecode import DMIParse
from evidence.xapian import search, index from evidence.xapian import search, index
from evidence.models import Evidence, Annotation from evidence.models import Annotation
from utils.constants import ALGOS, CHASSIS_DH from utils.constants import ALGOS, CHASSIS_DH

492
evidence/parse_details.py Normal file
View file

@ -0,0 +1,492 @@
import json
import numpy as np
from datetime import datetime
from dmidecode import DMIParse
from evidence import base2, unit
from utils.constants import CHASSIS_DH, DATASTORAGEINTERFACE
def get_lshw_child(child, nets, component):
if child.get('id') == component:
nets.append(child)
if child.get('children'):
[get_lshw_child(x, nets, component) for x in child['children']]
class ParseSnapshot:
def __init__(self, snapshot, default="n/a"):
self.default = default
self.dmidecode_raw = snapshot["data"].get("dmidecode", "{}")
self.smart_raw = snapshot["data"].get("disks", [])
self.hwinfo_raw = snapshot["data"].get("hwinfo", "")
self.lshw_raw = snapshot["data"].get("lshw", {}) or {}
self.lscpi_raw = snapshot["data"].get("lspci", "")
self.device = {"actions": []}
self.components = []
self.monitors = []
self.dmi = DMIParse(self.dmidecode_raw)
self.smart = self.loads(self.smart_raw)
self.lshw = self.loads(self.lshw_raw)
self.hwinfo = self.parse_hwinfo()
self.set_computer()
self.get_hwinfo_monitors()
self.set_components()
self.snapshot_json = {
"type": "Snapshot",
"device": self.device,
"software": snapshot["software"],
"components": self.components,
"uuid": snapshot['uuid'],
"version": snapshot['version'],
"endTime": snapshot["timestamp"],
"elapsed": 1,
}
def set_computer(self):
self.device['manufacturer'] = self.dmi.manufacturer().strip()
self.device['model'] = self.dmi.model().strip()
self.device['serialNumber'] = self.dmi.serial_number()
self.device['type'] = self.get_type()
self.device['sku'] = self.get_sku()
self.device['version'] = self.get_version()
self.device['system_uuid'] = self.get_uuid()
self.device['family'] = self.get_family()
self.device['chassis'] = self.get_chassis_dh()
def set_components(self):
self.get_cpu()
self.get_ram()
self.get_mother_board()
self.get_graphic()
self.get_data_storage()
self.get_display()
self.get_sound_card()
self.get_networks()
def get_cpu(self):
for cpu in self.dmi.get('Processor'):
serial = cpu.get('Serial Number')
if serial == 'Not Specified' or not serial:
serial = cpu.get('ID').replace(' ', '')
self.components.append(
{
"actions": [],
"type": "Processor",
"speed": self.get_cpu_speed(cpu),
"cores": int(cpu.get('Core Count', 1)),
"model": cpu.get('Version'),
"threads": int(cpu.get('Thread Count', 1)),
"manufacturer": cpu.get('Manufacturer'),
"serialNumber": serial,
"generation": None,
"brand": cpu.get('Family'),
"address": self.get_cpu_address(cpu),
}
)
def get_ram(self):
for ram in self.dmi.get("Memory Device"):
if ram.get('size') == 'No Module Installed':
continue
if not ram.get("Speed"):
continue
self.components.append(
{
"actions": [],
"type": "RamModule",
"size": self.get_ram_size(ram),
"speed": self.get_ram_speed(ram),
"manufacturer": ram.get("Manufacturer", self.default),
"serialNumber": ram.get("Serial Number", self.default),
"interface": ram.get("Type", "DDR"),
"format": ram.get("Form Factor", "DIMM"),
"model": ram.get("Part Number", self.default),
}
)
def get_mother_board(self):
for moder_board in self.dmi.get("Baseboard"):
self.components.append(
{
"actions": [],
"type": "Motherboard",
"version": moder_board.get("Version"),
"serialNumber": moder_board.get("Serial Number", "").strip(),
"manufacturer": moder_board.get("Manufacturer", "").strip(),
"biosDate": self.get_bios_date(),
"ramMaxSize": self.get_max_ram_size(),
"ramSlots": len(self.dmi.get("Memory Device")),
"slots": self.get_ram_slots(),
"model": moder_board.get("Product Name", "").strip(),
"firewire": self.get_firmware_num(),
"pcmcia": self.get_pcmcia_num(),
"serial": self.get_serial_num(),
"usb": self.get_usb_num(),
}
)
def get_graphic(self):
displays = []
get_lshw_child(self.lshw, displays, 'display')
for c in displays:
if not c['configuration'].get('driver', None):
continue
self.components.append(
{
"actions": [],
"type": "GraphicCard",
"memory": self.get_memory_video(c),
"manufacturer": c.get("vendor", self.default),
"model": c.get("product", self.default),
"serialNumber": c.get("serial", self.default),
}
)
def get_memory_video(self, c):
# get info of lspci
# pci_id = c['businfo'].split('@')[1]
# lspci.get(pci_id) | grep size
# lspci -v -s 00:02.0
return None
def get_data_storage(self):
for sm in self.smart:
if sm.get('smartctl', {}).get('exit_status') == 1:
continue
model = sm.get('model_name')
manufacturer = None
if model and len(model.split(" ")) > 1:
mm = model.split(" ")
model = mm[-1]
manufacturer = " ".join(mm[:-1])
self.components.append(
{
"actions": self.sanitize(sm),
"type": self.get_data_storage_type(sm),
"model": model,
"manufacturer": manufacturer,
"serialNumber": sm.get('serial_number'),
"size": self.get_data_storage_size(sm),
"variant": sm.get("firmware_version"),
"interface": self.get_data_storage_interface(sm),
}
)
def sanitize(self, action):
return []
def get_networks(self):
networks = []
get_lshw_child(self.lshw, networks, 'network')
for c in networks:
capacity = c.get('capacity')
units = c.get('units')
speed = None
if capacity and units:
speed = unit.Quantity(capacity, units).to('Mbit/s').m
wireless = bool(c.get('configuration', {}).get('wireless', False))
self.components.append(
{
"actions": [],
"type": "NetworkAdapter",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
"speed": speed,
"variant": c.get('version', 1),
"wireless": wireless,
}
)
def get_sound_card(self):
multimedias = []
get_lshw_child(self.lshw, multimedias, 'multimedia')
for c in multimedias:
self.components.append(
{
"actions": [],
"type": "SoundCard",
"model": c.get('product'),
"manufacturer": c.get('vendor'),
"serialNumber": c.get('serial'),
}
)
def get_display(self): # noqa: C901
TECHS = 'CRT', 'TFT', 'LED', 'PDP', 'LCD', 'OLED', 'AMOLED'
for c in self.monitors:
resolution_width, resolution_height = (None,) * 2
refresh, serial, model, manufacturer, size = (None,) * 5
year, week, production_date = (None,) * 3
for x in c:
if "Vendor: " in x:
manufacturer = x.split('Vendor: ')[-1].strip()
if "Model: " in x:
model = x.split('Model: ')[-1].strip()
if "Serial ID: " in x:
serial = x.split('Serial ID: ')[-1].strip()
if " Resolution: " in x:
rs = x.split(' Resolution: ')[-1].strip()
if 'x' in rs:
resolution_width, resolution_height = [
int(r) for r in rs.split('x')
]
if "Frequencies: " in x:
try:
refresh = int(float(x.split(',')[-1].strip()[:-3]))
except Exception:
pass
if 'Year of Manufacture' in x:
year = x.split(': ')[1]
if 'Week of Manufacture' in x:
week = x.split(': ')[1]
if "Size: " in x:
size = self.get_size_monitor(x)
technology = next((t for t in TECHS if t in c[0]), None)
if year and week:
d = '{} {} 0'.format(year, week)
production_date = datetime.strptime(d, '%Y %W %w').isoformat()
self.components.append(
{
"actions": [],
"type": "Display",
"model": model,
"manufacturer": manufacturer,
"serialNumber": serial,
'size': size,
'resolutionWidth': resolution_width,
'resolutionHeight': resolution_height,
"productionDate": production_date,
'technology': technology,
'refreshRate': refresh,
}
)
def get_hwinfo_monitors(self):
for c in self.hwinfo:
monitor = None
external = None
for x in c:
if 'Hardware Class: monitor' in x:
monitor = c
if 'Driver Info' in x:
external = c
if monitor and not external:
self.monitors.append(c)
def get_size_monitor(self, x):
i = 1 / 25.4
t = x.split('Size: ')[-1].strip()
tt = t.split('mm')
if not tt:
return 0
sizes = tt[0].strip()
if 'x' not in sizes:
return 0
w, h = [int(x) for x in sizes.split('x')]
return np.sqrt(w**2 + h**2) * i
def get_cpu_address(self, cpu):
default = 64
for ch in self.lshw.get('children', []):
for c in ch.get('children', []):
if c['class'] == 'processor':
return c.get('width', default)
return default
def get_usb_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "USB" in u.get("Port Type", "").upper()
]
)
def get_serial_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "SERIAL" in u.get("Port Type", "").upper()
]
)
def get_firmware_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "FIRMWARE" in u.get("Port Type", "").upper()
]
)
def get_pcmcia_num(self):
return len(
[
u
for u in self.dmi.get("Port Connector")
if "PCMCIA" in u.get("Port Type", "").upper()
]
)
def get_bios_date(self):
return self.dmi.get("BIOS")[0].get("Release Date", self.default)
def get_firmware(self):
return self.dmi.get("BIOS")[0].get("Firmware Revision", '1')
def get_max_ram_size(self):
size = 0
for slot in self.dmi.get("Physical Memory Array"):
capacity = slot.get("Maximum Capacity", '0').split(" ")[0]
size += int(capacity)
return size
def get_ram_slots(self):
slots = 0
for x in self.dmi.get("Physical Memory Array"):
slots += int(x.get("Number Of Devices", 0))
return slots
def get_ram_size(self, ram):
try:
memory = ram.get("Size", "0")
memory = memory.split(' ')
if len(memory) > 1:
size = int(memory[0])
units = memory[1]
return base2.Quantity(size, units).to('MiB').m
return int(size.split(" ")[0])
except Exception as err:
logger.error("get_ram_size error: {}".format(err))
return 0
def get_ram_speed(self, ram):
size = ram.get("Speed", "0")
return int(size.split(" ")[0])
def get_cpu_speed(self, cpu):
speed = cpu.get('Max Speed', "0")
return float(speed.split(" ")[0]) / 1024
def get_sku(self):
return self.dmi.get("System")[0].get("SKU Number", self.default).strip()
def get_version(self):
return self.dmi.get("System")[0].get("Version", self.default).strip()
def get_uuid(self):
return self.dmi.get("System")[0].get("UUID", '').strip()
def get_family(self):
return self.dmi.get("System")[0].get("Family", '')
def get_chassis(self):
return self.dmi.get("Chassis")[0].get("Type", '_virtual')
def get_type(self):
chassis_type = self.get_chassis()
return self.translation_to_devicehub(chassis_type)
def translation_to_devicehub(self, original_type):
lower_type = original_type.lower()
CHASSIS_TYPE = {
'Desktop': [
'desktop',
'low-profile',
'tower',
'docking',
'all-in-one',
'pizzabox',
'mini-tower',
'space-saving',
'lunchbox',
'mini',
'stick',
],
'Laptop': [
'portable',
'laptop',
'convertible',
'tablet',
'detachable',
'notebook',
'handheld',
'sub-notebook',
],
'Server': ['server'],
'Computer': ['_virtual'],
}
for k, v in CHASSIS_TYPE.items():
if lower_type in v:
return k
return self.default
def get_chassis_dh(self):
chassis = self.get_chassis()
lower_type = chassis.lower()
for k, v in CHASSIS_DH.items():
if lower_type in v:
return k
return self.default
def get_data_storage_type(self, x):
# TODO @cayop add more SSDS types
SSDS = ["nvme"]
SSD = 'SolidStateDrive'
HDD = 'HardDrive'
type_dev = x.get('device', {}).get('type')
trim = x.get('trim', {}).get("supported") in [True, "true"]
return SSD if type_dev in SSDS or trim else HDD
def get_data_storage_interface(self, x):
interface = x.get('device', {}).get('protocol', 'ATA')
if interface.upper() in DATASTORAGEINTERFACE:
return interface.upper()
txt = "Sid: {}, interface {} is not in DataStorageInterface Enum".format(
self.sid, interface
)
self.errors("{}".format(err))
def get_data_storage_size(self, x):
total_capacity = x.get('user_capacity', {}).get('bytes')
if not total_capacity:
return 1
# convert bytes to Mb
return total_capacity / 1024**2
def parse_hwinfo(self):
hw_blocks = self.hwinfo_raw.split("\n\n")
return [x.split("\n") for x in hw_blocks]
def loads(self, x):
if isinstance(x, str):
return json.loads(x)
return x
def errors(self, txt=None):
if not txt:
return self._errors
logger.error(txt)
self._errors.append(txt)

View file

@ -0,0 +1,4 @@
K = KiB = k = kb = KB
M = MiB = m = mb = MB
G = GiB = g = gb = GB
T = TiB = t = tb = TB

View file

@ -0,0 +1,9 @@
HZ = hertz = hz
KHZ = kilohertz = khz
MHZ = megahertz = mhz
GHZ = gigahertz = ghz
B = byte = b = UNIT = unit
KB = kilobyte = kb = K = k
MB = megabyte = mb = M = m
GB = gigabyte = gb = G = g
T = terabyte = tb = T = t

View file

@ -10,3 +10,4 @@ pandas==2.2.2
xlrd==2.0.1 xlrd==2.0.1
odfpy==1.4.1 odfpy==1.4.1
pytz==2024.2 pytz==2024.2
Pint==0.24.3

View file

@ -1,6 +1,37 @@
from django.shortcuts import render from django.shortcuts import render
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from dashboard.mixins import InventaryMixin, DetailsMixin from dashboard.mixins import DashboardView
class ProfileView(DashboardView):
template_name = "profile.html"
subtitle = _('My personal data')
icon = 'bi bi-person-gear'
fields = ('first_name', 'last_name', 'email')
success_url = reverse_lazy('idhub:user_profile')
model = User
def get_queryset(self, **kwargs):
queryset = Membership.objects.select_related('user').filter(
user=self.request.user)
return queryset
def get_object(self):
return self.request.user
def get_form(self):
form = super().get_form()
return form
def form_valid(self, form):
return super().form_valid(form)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context.update({
'lang': self.request.LANGUAGE_CODE,
})
return context

View file

@ -38,3 +38,11 @@ CHASSIS_DH = {
'Tablet': {'tablet'}, 'Tablet': {'tablet'},
'Virtual': {'_virtual'}, 'Virtual': {'_virtual'},
} }
DATASTORAGEINTERFACE = [
'ATA',
'USB',
'PCI',
'NVME',
]